You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2013/03/04 12:24:53 UTC

svn commit: r1452257 [5/14] - in /hbase/branches/0.94: security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/ src/main/jamon/org/apache/hadoop/hbase/tmpl/master/ src/main/java/org...

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * This is the notification interface for Procedures that encapsulates message passing from
+ * members to a coordinator.  Each of these calls should send a message to the coordinator.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProcedureMemberRpcs extends Closeable {
+
+  /**
+   * Initialize and start any threads or connections the member needs.
+   */
+  public void start(ProcedureMember member);
+
+  /**
+   * Each subprocedure is being executed on a member.  This is the identifier for the member.
+   * @return the member name
+   */
+  public String getMemberName();
+
+  /**
+   * Notify the coordinator that we aborted the specified {@link Subprocedure}
+   *
+   * @param sub the {@link Subprocedure} we are aborting
+   * @param cause the reason why the member's subprocedure aborted
+   * @throws IOException thrown when the rpcs can't reach the other members of the procedure (and
+   *  thus can't recover).
+   */
+  public void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
+
+  /**
+   * Notify the coordinator that the specified {@link Subprocedure} has acquired the locally required
+   * barrier condition.
+   *
+   * @param sub the specified {@link Subprocedure}
+   * @throws IOException if we can't reach the coordinator
+   */
+  public void sendMemberAcquired(Subprocedure sub) throws IOException;
+
+  /**
+   * Notify the coordinator that the specified {@link Subprocedure} has completed the work that
+   * needed to be done under the global barrier.
+   *
+   * @param sub the specified {@link Subprocedure}
+   * @throws IOException if we can't reach the coordinator
+   */
+  public void sendMemberCompleted(Subprocedure sub) throws IOException;
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
+import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
+
+/**
+ * Distributed procedure member's Subprocedure.  A procedure is sarted on a ProcedureCoordinator
+ * which communicates with ProcedureMembers who create and start its part of the Procedure.  This
+ * sub part is called a Subprocedure
+ *
+ * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this
+ * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and
+ * {@link #cleanup(Exception)} (release state associated with subprocedure.)
+ *
+ * When submitted to a ProcedureMemeber, the call method is executed in a separate thread.
+ * Latches are use too block its progress and trigger continuations when barrier conditions are
+ * met.
+ *
+ * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()}
+ * gets converted into {@link ForeignException}, which will get propagated to the
+ * {@link ProcedureCoordinator}.
+ *
+ * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific
+ * barrierName. (ex: snapshot121126).
+ */
+abstract public class Subprocedure implements Callable<Void> {
+  private static final Log LOG = LogFactory.getLog(Subprocedure.class);
+
+  // Name of the procedure
+  final private String barrierName;
+
+  //
+  // Execution state
+  //
+
+  /** wait on before allowing the in barrier phase to proceed */
+  private final CountDownLatch inGlobalBarrier;
+  /** counted down when the Subprocedure has completed */
+  private final CountDownLatch releasedLocalBarrier;
+
+  //
+  // Error handling
+  //
+  /** monitor to check for errors */
+  protected final ForeignExceptionDispatcher monitor;
+  /** frequency to check for errors (ms) */
+  protected final long wakeFrequency;
+  protected final TimeoutExceptionInjector executionTimeoutTimer;
+  protected final ProcedureMemberRpcs rpcs;
+
+  private volatile boolean complete = false;
+
+  /**
+   * @param member reference to the member managing this subprocedure
+   * @param procName name of the procedure this subprocedure is associated with
+   * @param monitor notified if there is an error in the subprocedure
+   * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
+   *          milliseconds).
+   * @param timeout time in millis that will trigger a subprocedure abort if it has not completed
+   */
+  public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
+      long wakeFrequency, long timeout) {
+    // Asserts should be caught during unit testing
+    assert member != null : "procedure member should be non-null";
+    assert member.getRpcs() != null : "rpc handlers should be non-null";
+    assert procName != null : "procedure name should be non-null";
+    assert monitor != null : "monitor should be non-null";
+
+    // Default to a very large timeout
+    this.rpcs = member.getRpcs();
+    this.barrierName = procName;
+    this.monitor = monitor;
+    // forward any failures to coordinator.  Since this is a dispatcher, resend loops should not be
+    // possible.
+    this.monitor.addListener(new ForeignExceptionListener() {
+      @Override
+      public void receive(ForeignException ee) {
+        // if this is a notification from a remote source, just log
+        if (ee.isRemote()) {
+          LOG.debug("Was remote foreign exception, not redispatching error", ee);
+          return;
+        }
+
+        // if it is local, then send it to the coordinator
+        try {
+          rpcs.sendMemberAborted(Subprocedure.this, ee);
+        } catch (IOException e) {
+          // this will fail all the running procedures, since the connection is down
+          LOG.error("Can't reach controller, not propagating error", e);
+        }
+      }
+    });
+
+    this.wakeFrequency = wakeFrequency;
+    this.inGlobalBarrier = new CountDownLatch(1);
+    this.releasedLocalBarrier = new CountDownLatch(1);
+
+    // accept error from timer thread, this needs to be started.
+    this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
+  }
+
+  public String getName() {
+     return barrierName;
+  }
+
+  public String getMemberName() {
+    return rpcs.getMemberName();
+  }
+
+  private void rethrowException() throws ForeignException {
+    monitor.rethrowException();
+  }
+
+  /**
+   * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods
+   * while keeping some state for other threads to access.
+   *
+   * This would normally be executed by the ProcedureMemeber when a acquire message comes from the
+   * coordinator.  Rpcs are used to spend message back to the coordinator after different phases
+   * are executed.  Any exceptions caught during the execution (except for InterrupedException) get
+   * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
+   * Subprocedure, ForeignException)}.
+   */
+  @SuppressWarnings("finally")
+  final public Void call() {
+    LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
+        executionTimeoutTimer.getMaxTime() + "ms");
+    // start the execution timeout timer
+    executionTimeoutTimer.start();
+
+    try {
+      // start by checking for error first
+      rethrowException();
+      LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
+      acquireBarrier();
+      LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
+
+      // vote yes to coordinator about being prepared
+      rpcs.sendMemberAcquired(this);
+      LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
+          " 'reached' or 'abort' from coordinator");
+
+      // wait for the procedure to reach global barrier before proceding
+      waitForReachedGlobalBarrier();
+      rethrowException(); // if Coordinator aborts, will bail from here with exception
+
+      // In traditional 2PC, if a member reaches this state the TX has been committed and the
+      // member is responsible for rolling forward and recovering and completing the subsequent
+      // operations in the case of failure.  It cannot rollback.
+      //
+      // This implementation is not 2PC since it can still rollback here, and thus has different
+      // semantics.
+
+      LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
+      insideBarrier();
+      LOG.debug("Subprocedure '" + barrierName + "' locally completed");
+
+      // Ack that the member has executed and released local barrier
+      rpcs.sendMemberCompleted(this);
+      LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
+
+      // make sure we didn't get an external exception
+      rethrowException();
+    } catch (Exception e) {
+      String msg = null;
+      if (e instanceof InterruptedException) {
+        msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
+            " Likely due to pool shutdown.";
+        Thread.currentThread().interrupt();
+      } else if (e instanceof ForeignException) {
+        msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
+      } else {
+        msg = "Subprocedure '" + barrierName + "' failed!";
+      }
+      LOG.error(msg , e);
+      cancel(msg, e);
+
+      LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
+      cleanup(e);
+    } finally {
+      releasedLocalBarrier.countDown();
+
+      // tell the timer we are done, if we get here successfully
+      executionTimeoutTimer.complete();
+      complete = true;
+      LOG.debug("Subprocedure '" + barrierName + "' completed.");
+      return null;
+    }
+  }
+
+  boolean isComplete() {
+    return complete;
+  }
+
+  /**
+   * exposed for testing.
+   */
+  ForeignExceptionSnare getErrorCheckable() {
+    return this.monitor;
+  }
+
+  /**
+   * The implementation of this method should gather and hold required resources (locks, disk
+   * space, etc) to satisfy the Procedures barrier condition.  For example, this would be where
+   * to make all the regions on a RS on the quiescent for an procedure that required all regions
+   * to be globally quiesed.
+   *
+   * Users should override this method.  If a quiescent is not required, this is overkill but
+   * can still be used to execute a procedure on all members and to propagate any exceptions.
+   *
+   * @throws ForeignException
+   */
+  abstract public void acquireBarrier() throws ForeignException;
+
+  /**
+   * The implementation of this method should act with the assumption that the barrier condition
+   * has been satisfied.  Continuing the previous example, a condition could be that all RS's
+   * globally have been quiesced, and procedures that require this precondition could be
+   * implemented here.
+   *
+   * Users should override this method.  If quiescense is not required, this can be a no-op
+   *
+   * @throws ForeignException
+   */
+  abstract public void insideBarrier() throws ForeignException;
+
+  /**
+   * Users should override this method. This implementation of this method should rollback and
+   * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
+   * created.
+   * @param e
+   */
+  abstract public void cleanup(Exception e);
+
+  /**
+   * Method to cancel the Subprocedure by injecting an exception from and external source.
+   * @param cause
+   */
+  public void cancel(String msg, Throwable cause) {
+    LOG.error(msg, cause);
+    if (cause instanceof ForeignException) {
+      monitor.receive((ForeignException) cause);
+    } else {
+      monitor.receive(new ForeignException(getMemberName(), cause));
+    }
+  }
+
+  /**
+   * Callback for the member rpcs to call when the global barrier has been reached.  This
+   * unblocks the main subprocedure exectuion thread so that the Subprocedure's
+   * {@link #insideBarrier()} method can be run.
+   */
+  public void receiveReachedGlobalBarrier() {
+    inGlobalBarrier.countDown();
+  }
+
+  //
+  // Subprocedure Internal State interface
+  //
+
+  /**
+   * Wait for the reached global barrier notification.
+   *
+   * Package visibility for testing
+   *
+   * @throws ForeignException
+   * @throws InterruptedException
+   */
+  void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
+    Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
+        barrierName + ":remote acquired");
+  }
+
+  /**
+   * Waits until the entire procedure has globally completed, or has been aborted.
+   * @throws ForeignException
+   * @throws InterruptedException
+   */
+  public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
+    Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
+        barrierName + ":completed");
+  }
+
+  /**
+   * Empty Subprocedure for testing.
+   *
+   * Must be public for stubbing used in testing to work.
+   */
+  public static class SubprocedureImpl extends Subprocedure {
+
+    public SubprocedureImpl(ProcedureMember member, String opName,
+        ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
+      super(member, opName, monitor, wakeFrequency, timeout);
+    }
+
+    @Override
+    public void acquireBarrier() throws ForeignException {}
+
+    @Override
+    public void insideBarrier() throws ForeignException {}
+
+    @Override
+    public void cleanup(Exception e) {}
+  };
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/SubprocedureFactory.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Task builder to build instances of a {@link ProcedureMember}'s {@link Subprocedure}s.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface SubprocedureFactory {
+
+  /**
+   * Build {@link Subprocedure} when requested.
+   * @param procName name of the procedure associated with this subprocedure
+   * @param procArgs  arguments passed from the coordinator about the procedure
+   * @return {@link Subprocedure} to run or <tt>null</tt> if the no operation should be run
+   * @throws IllegalArgumentException if the operation could not be run because of errors in the
+   *           request
+   * @throws IllegalStateException if the current runner cannot accept any more new requests
+   */
+  public Subprocedure buildSubprocedure(String procName, byte[] procArgs);
+}

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
+  public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
+  private ZKProcedureUtil zkProc = null;
+  protected ProcedureCoordinator coordinator = null;  // if started this should be non-null
+
+  ZooKeeperWatcher watcher;
+  String procedureType;
+  String coordName;
+
+  /**
+   * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
+   * @param procedureClass procedure type name is a category for when there are multiple kinds of
+   *    procedures.-- this becomes a znode so be aware of the naming restrictions
+   * @param coordName name of the node running the coordinator
+   * @throws KeeperException if an unexpected zk error occurs
+   */
+  public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
+      String procedureClass, String coordName) throws KeeperException {
+    this.watcher = watcher;
+    this.procedureType = procedureClass;
+    this.coordName = coordName;
+  }
+
+  /**
+   * The "acquire" phase.  The coordinator creates a new procType/acquired/ znode dir. If znodes
+   * appear, first acquire to relevant listener or sets watch waiting for notification of
+   * the acquire node
+   *
+   * @param proc the Procedure
+   * @param info data to be stored in the acquire node
+   * @param nodeNames children of the acquire phase
+   * @throws IOException if any failure occurs.
+   */
+  @Override
+  final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
+      throws IOException, IllegalArgumentException {
+    String procName = proc.getName();
+    // start watching for the abort node
+    String abortNode = zkProc.getAbortZNode(procName);
+    try {
+      // check to see if the abort node already exists
+      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
+        abort(abortNode);
+      }
+      // If we get an abort node watch triggered here, we'll go complete creating the acquired
+      // znode but then handle the acquire znode and bail out
+    } catch (KeeperException e) {
+      LOG.error("Failed to watch abort", e);
+      throw new IOException("Failed while watching abort node:" + abortNode, e);
+    }
+
+    // create the acquire barrier
+    String acquire = zkProc.getAcquiredBarrierNode(procName);
+    LOG.debug("Creating acquire znode:" + acquire);
+    try {
+      // notify all the procedure listeners to look for the acquire node
+      byte[] data = ProtobufUtil.prependPBMagic(info);
+      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
+      // loop through all the children of the acquire phase and watch for them
+      for (String node : nodeNames) {
+        String znode = ZKUtil.joinZNode(acquire, node);
+        LOG.debug("Watching for acquire node:" + znode);
+        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+          coordinator.memberAcquiredBarrier(procName, node);
+        }
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed while creating acquire node:" + acquire, e);
+    }
+  }
+
+  @Override
+  public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
+    String procName = proc.getName();
+    String reachedNode = zkProc.getReachedBarrierNode(procName);
+    LOG.debug("Creating reached barrier zk node:" + reachedNode);
+    try {
+      // create the reached znode and watch for the reached znodes
+      ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
+      // loop through all the children of the acquire phase and watch for them
+      for (String node : nodeNames) {
+        String znode = ZKUtil.joinZNode(reachedNode, node);
+        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
+          coordinator.memberFinishedBarrier(procName, node);
+        }
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed while creating reached node:" + reachedNode, e);
+    }
+  }
+
+
+  /**
+   * Delete znodes that are no longer in use.
+   */
+  @Override
+  final public void resetMembers(Procedure proc) throws IOException {
+    String procName = proc.getName();
+    boolean stillGettingNotifications = false;
+    do {
+      try {
+        LOG.debug("Attempting to clean out zk node for op:" + procName);
+        zkProc.clearZNodes(procName);
+        stillGettingNotifications = false;
+      } catch (KeeperException.NotEmptyException e) {
+        // recursive delete isn't transactional (yet) so we need to deal with cases where we get
+        // children trickling in
+        stillGettingNotifications = true;
+      } catch (KeeperException e) {
+        throw new IOException("Failed to complete reset procedure " + procName, e);
+      }
+    } while (stillGettingNotifications);
+  }
+
+  /**
+   * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
+   * @return true if succeed, false if encountered initialization errors.
+   */
+  final public boolean start(final ProcedureCoordinator coordinator) {
+    if (this.coordinator != null) {
+      throw new IllegalStateException(
+        "ZKProcedureCoordinator already started and already has listener installed");
+    }
+    this.coordinator = coordinator;
+
+    try {
+      this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
+        @Override
+        public void nodeCreated(String path) {
+          if (!isInProcedurePath(path)) return;
+          LOG.debug("Node created: " + path);
+          logZKTree(this.baseZNode);
+          if (isAcquiredPathNode(path)) {
+            // node wasn't present when we created the watch so zk event triggers acquire
+            coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
+              ZKUtil.getNodeName(path));
+          } else if (isReachedPathNode(path)) {
+            // node was absent when we created the watch so zk event triggers the finished barrier.
+
+            // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
+            coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
+              ZKUtil.getNodeName(path));
+          } else if (isAbortPathNode(path)) {
+            abort(path);
+          }
+        }
+      };
+      zkProc.clearChildZNodes();
+    } catch (KeeperException e) {
+      LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
+      return false;
+    }
+
+    LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
+    return true;
+  }
+
+  /**
+   * This is the abort message being sent by the coordinator to member
+   *
+   * TODO this code isn't actually used but can be used to issue a cancellation from the
+   * coordinator.
+   */
+  @Override
+  final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
+    String procName = proc.getName();
+    LOG.debug("Aborting procedure '" + procName + "' in zk");
+    String procAbortNode = zkProc.getAbortZNode(procName);
+    try {
+      LOG.debug("Creating abort znode:" + procAbortNode);
+      String source = (ee.getSource() == null) ? coordName : ee.getSource();
+      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
+      // first create the znode for the procedure
+      ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
+      LOG.debug("Finished creating abort node:" + procAbortNode);
+    } catch (KeeperException e) {
+      // possible that we get this error for the procedure if we already reset the zk state, but in
+      // that case we should still get an error for that procedure anyways
+      zkProc.logZKTree(zkProc.baseZNode);
+      coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
+          + " to abort procedure '" + procName + "'", new IOException(e));
+    }
+  }
+
+  /**
+   * Receive a notification and propagate it to the local coordinator
+   * @param abortNode full znode path to the failed procedure information
+   */
+  protected void abort(String abortNode) {
+    String procName = ZKUtil.getNodeName(abortNode);
+    ForeignException ee = null;
+    try {
+      byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
+      if (!ProtobufUtil.isPBMagicPrefix(data)) {
+        LOG.warn("Got an error notification for op:" + abortNode
+            + " but we can't read the information. Killing the procedure.");
+        // we got a remote exception, but we can't describe it
+        ee = new ForeignException(coordName, "Data in abort node is illegally formatted.  ignoring content.");
+      } else {
+
+        data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+        ee = ForeignException.deserialize(data);
+      }
+    } catch (InvalidProtocolBufferException e) {
+      LOG.warn("Got an error notification for op:" + abortNode
+          + " but we can't read the information. Killing the procedure.");
+      // we got a remote exception, but we can't describe it
+      ee = new ForeignException(coordName, e);
+    } catch (KeeperException e) {
+      coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
+          + zkProc.getAbortZnode(), new IOException(e));
+    }
+    coordinator.abortProcedure(procName, ee);
+  }
+
+  @Override
+  final public void close() throws IOException {
+    zkProc.close();
+  }
+
+  /**
+   * Used in testing
+   */
+  final ZKProcedureUtil getZkProcedureUtil() {
+    return zkProc;
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ZooKeeper based controller for a procedure member.
+ * <p>
+ * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
+ * since each procedure type is bound to a single set of znodes. You can have multiple
+ * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
+ * name, but each individual rpcs is still bound to a single member name (and since they are
+ * used to determine global progress, its important to not get this wrong).
+ * <p>
+ * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
+ * time (as long as they have different types), from the same controller, but the same node name
+ * must be used for each procedure (though there is no conflict between the two procedure as long
+ * as they have distinct names).
+ * <p>
+ * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
+ * its re-initialization will delete the znodes and require all in progress subprocedures to start
+ * anew.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
+
+  private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
+  private final String memberName;
+
+  protected ProcedureMember member;
+  private ZKProcedureUtil zkController;
+
+  /**
+   * Must call {@link #start(ProcedureMember)} before this can be used.
+   * @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
+   *          {@link #close()}.
+   * @param procType name of the znode describing the procedure type
+   * @param memberName name of the member to join the procedure
+   * @throws KeeperException if we can't reach zookeeper
+   */
+  public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
+      String procType, String memberName) throws KeeperException {
+    this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
+      @Override
+      public void nodeCreated(String path) {
+        if (!isInProcedurePath(path)) {
+          return;
+        }
+
+        LOG.info("Received created event:" + path);
+        // if it is a simple start/end/abort then we just rewatch the node
+        if (isAcquiredNode(path)) {
+          waitForNewProcedures();
+          return;
+        } else if (isAbortNode(path)) {
+          watchForAbortedProcedures();
+          return;
+        }
+        String parent = ZKUtil.getParent(path);
+        // if its the end barrier, the procedure can be completed
+        if (isReachedNode(parent)) {
+          receivedReachedGlobalBarrier(path);
+          return;
+        } else if (isAbortNode(parent)) {
+          abort(path);
+          return;
+        } else if (isAcquiredNode(parent)) {
+          startNewSubprocedure(path);
+        } else {
+          LOG.debug("Ignoring created notification for node:" + path);
+        }
+      }
+
+      @Override
+      public void nodeChildrenChanged(String path) {
+        if (path.equals(this.acquiredZnode)) {
+          LOG.info("Received procedure start children changed event: " + path);
+          waitForNewProcedures();
+        } else if (path.equals(this.abortZnode)) {
+          LOG.info("Received procedure abort children changed event: " + path);
+          watchForAbortedProcedures();
+        }
+      }
+    };
+    this.memberName = memberName;
+  }
+
+  public ZKProcedureUtil getZkController() {
+    return zkController;
+  }
+
+  @Override
+  public String getMemberName() {
+    return memberName;
+  }
+
+  /**
+   * Pass along the procedure global barrier notification to any listeners
+   * @param path full znode path that cause the notification
+   */
+  private void receivedReachedGlobalBarrier(String path) {
+    LOG.debug("Recieved reached global barrier:" + path);
+    String procName = ZKUtil.getNodeName(path);
+    this.member.receivedReachedGlobalBarrier(procName);
+  }
+
+  private void watchForAbortedProcedures() {
+    LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
+    try {
+      // this is the list of the currently aborted procedues
+      for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
+        zkController.getAbortZnode())) {
+        String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
+        abort(abortNode);
+      }
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to list children for abort node:"
+          + zkController.getAbortZnode(), new IOException(e));
+    }
+  }
+
+  private void waitForNewProcedures() {
+    // watch for new procedues that we need to start subprocedures for
+    LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
+    List<String> runningProcedures = null;
+    try {
+      runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
+        zkController.getAcquiredBarrier());
+      if (runningProcedures == null) {
+        LOG.debug("No running procedures.");
+        return;
+      }
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("General failure when watching for new procedures",
+        new IOException(e));
+    }
+    if (runningProcedures == null) {
+      LOG.debug("No running procedures.");
+      return;
+    }
+    for (String procName : runningProcedures) {
+      // then read in the procedure information
+      String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
+      startNewSubprocedure(path);
+    }
+  }
+
+  /**
+   * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
+   * <p>
+   * Will attempt to create the same procedure multiple times if an procedure znode with the same
+   * name is created. It is left up the coordinator to ensure this doesn't occur.
+   * @param path full path to the znode for the procedure to start
+   */
+  private synchronized void startNewSubprocedure(String path) {
+    LOG.debug("Found procedure znode: " + path);
+    String opName = ZKUtil.getNodeName(path);
+    // start watching for an abort notification for the procedure
+    String abortZNode = zkController.getAbortZNode(opName);
+    try {
+      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
+        LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
+        return;
+      }
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
+          + ") for procedure :" + opName, new IOException(e));
+      return;
+    }
+
+    // get the data for the procedure
+    Subprocedure subproc = null;
+    try {
+      byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
+      LOG.debug("start proc data length is " + data.length);
+      if (!ProtobufUtil.isPBMagicPrefix(data)) {
+        String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
+            + "Killing the procedure.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+      LOG.debug("Found data for znode:" + path);
+      subproc = member.createSubprocedure(opName, data);
+      member.submitSubprocedure(subproc);
+    } catch (IllegalArgumentException iae ) {
+      LOG.error("Illegal argument exception", iae);
+      sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
+    } catch (IllegalStateException ise) {
+      LOG.error("Illegal state exception ", ise);
+      sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
+        new IOException(e));
+    }
+  }
+
+  /**
+   * This attempts to create an acquired state znode for the procedure (snapshot name).
+   *
+   * It then looks for the reached znode to trigger in-barrier execution.  If not present we
+   * have a watcher, if present then trigger the in-barrier action.
+   */
+  @Override
+  public void sendMemberAcquired(Subprocedure sub) throws IOException {
+    String procName = sub.getName();
+    try {
+      LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
+          + ") in zk");
+      String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
+        zkController, procName), memberName);
+      ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
+
+      // watch for the complete node for this snapshot
+      String reachedBarrier = zkController.getReachedBarrierNode(procName);
+      LOG.debug("Watch for global barrier reached:" + reachedBarrier);
+      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
+        receivedReachedGlobalBarrier(reachedBarrier);
+      }
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
+          + procName + " and member: " + memberName, new IOException(e));
+    }
+  }
+
+  /**
+   * This acts as the ack for a completed snapshot
+   */
+  @Override
+  public void sendMemberCompleted(Subprocedure sub) throws IOException {
+    String procName = sub.getName();
+    LOG.debug("Marking procedure  '" + procName + "' completed for member '" + memberName
+        + "' in zk");
+    String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
+    try {
+      ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath);
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to post zk node:" + joinPath
+          + " to join procedure barrier.", new IOException(e));
+    }
+  }
+
+  /**
+   * This should be called by the member and should write a serialized root cause exception as
+   * to the abort znode.
+   */
+  @Override
+  public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
+    if (sub == null) {
+      LOG.error("Failed due to null subprocedure", ee);
+      return;
+    }
+    String procName = sub.getName();
+    LOG.debug("Aborting procedure (" + procName + ") in zk");
+    String procAbortZNode = zkController.getAbortZNode(procName);
+    try {
+      String source = (ee.getSource() == null) ? memberName: ee.getSource();
+      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
+      ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
+      LOG.debug("Finished creating abort znode:" + procAbortZNode);
+    } catch (KeeperException e) {
+      // possible that we get this error for the procedure if we already reset the zk state, but in
+      // that case we should still get an error for that procedure anyways
+      zkController.logZKTree(zkController.getBaseZnode());
+      member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
+          + " to abort procedure", new IOException(e));
+    }
+  }
+
+  /**
+   * Pass along the found abort notification to the listener
+   * @param abortZNode full znode path to the failed procedure information
+   */
+  protected void abort(String abortZNode) {
+    LOG.debug("Aborting procedure member for znode " + abortZNode);
+    String opName = ZKUtil.getNodeName(abortZNode);
+    try {
+      byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
+
+      // figure out the data we need to pass
+      ForeignException ee;
+      try {
+        if (!ProtobufUtil.isPBMagicPrefix(data)) {
+          String msg = "Illegally formatted data in abort node for proc " + opName
+              + ".  Killing the procedure.";
+          LOG.error(msg);
+          // we got a remote exception, but we can't describe it so just return exn from here
+          ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
+        } else {
+          data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
+          ee = ForeignException.deserialize(data);
+        }
+      } catch (InvalidProtocolBufferException e) {
+        LOG.warn("Got an error notification for op:" + opName
+            + " but we can't read the information. Killing the procedure.");
+        // we got a remote exception, but we can't describe it so just return exn from here
+        ee = new ForeignException(getMemberName(), e);
+      }
+
+      this.member.receiveAbortProcedure(opName, ee);
+    } catch (KeeperException e) {
+      member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
+          + zkController.getAbortZnode(), new IOException(e));
+    }
+  }
+
+  public void start(ProcedureMember listener) {
+    LOG.debug("Starting procedure member '" + this.memberName + "'");
+    this.member = listener;
+    watchForAbortedProcedures();
+    waitForNewProcedures();
+  }
+
+  @Override
+  public void close() throws IOException {
+    zkController.close();
+  }
+
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is a shared ZooKeeper-based znode management utils for distributed procedure.  All znode
+ * operations should go through the provided methods in coordinators and members.
+ *
+ * Layout of nodes in ZK is
+ * /hbase/[op name]/acquired/
+ *                    [op instance] - op data/
+ *                        /[nodes that have acquired]
+ *                 /reached/
+ *                    [op instance]/
+ *                        /[nodes that have completed]
+ *                 /abort/
+ *                    [op instance] - failure data
+ *
+ * NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
+ *
+ * Assumption here that procedure names are unique
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class ZKProcedureUtil
+    extends ZooKeeperListener implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
+
+  public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
+  public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
+  public static final String ABORT_ZNODE_DEFAULT = "abort";
+
+  public final String baseZNode;
+  protected final String acquiredZnode;
+  protected final String reachedZnode;
+  protected final String abortZnode;
+
+  protected final String memberName;
+
+  /**
+   * Top-level watcher/controller for procedures across the cluster.
+   * <p>
+   * On instantiation, this ensures the procedure znodes exist.  This however requires the passed in
+   *  watcher has been started.
+   * @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
+   *          {@link #close()}
+   * @param procDescription name of the znode describing the procedure to run
+   * @param memberName name of the member from which we are interacting with running procedures
+   * @throws KeeperException when the procedure znodes cannot be created
+   */
+  public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
+      String memberName) throws KeeperException {
+    super(watcher);
+    this.memberName = memberName;
+    // make sure we are listening for events
+    watcher.registerListener(this);
+    // setup paths for the zknodes used in procedures
+    this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
+    acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
+    reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
+    abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
+
+    // first make sure all the ZK nodes exist
+    // make sure all the parents exist (sometimes not the case in tests)
+    ZKUtil.createWithParents(watcher, acquiredZnode);
+    // regular create because all the parents exist
+    ZKUtil.createAndFailSilent(watcher, reachedZnode);
+    ZKUtil.createAndFailSilent(watcher, abortZnode);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (watcher != null) {
+      watcher.close();
+    }
+  }
+
+  public String getAcquiredBarrierNode(String opInstanceName) {
+    return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
+  }
+
+  public String getReachedBarrierNode(String opInstanceName) {
+    return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
+  }
+
+  public String getAbortZNode(String opInstanceName) {
+    return ZKProcedureUtil.getAbortNode(this, opInstanceName);
+  }
+
+  public String getAbortZnode() {
+    return abortZnode;
+  }
+
+  public String getBaseZnode() {
+    return baseZNode;
+  }
+
+  public String getAcquiredBarrier() {
+    return acquiredZnode;
+  }
+
+  public String getMemberName() {
+    return memberName;
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator to trigger a global barrier
+   * acquire on each subprocedure.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the prepare barrier/start node
+   */
+  public static String getAcquireBarrierNode(ZKProcedureUtil controller,
+      String opInstanceName) {
+    return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator to trigger a global barrier
+   * execution and release on each subprocedure.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the commit barrier
+   */
+  public static String getReachedBarrierNode(ZKProcedureUtil controller,
+      String opInstanceName) {
+    return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
+  }
+
+  /**
+   * Get the full znode path for the node used by the coordinator or member to trigger an abort
+   * of the global barrier acquisition or execution in subprocedures.
+   * @param controller controller running the procedure
+   * @param opInstanceName name of the running procedure instance (not the procedure description).
+   * @return full znode path to the abort znode
+   */
+  public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
+    return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
+  }
+
+  public ZooKeeperWatcher getWatcher() {
+    return watcher;
+  }
+
+  /**
+   * Is this a procedure related znode path?
+   *
+   * TODO: this is not strict, can return true if had name just starts with same prefix but is
+   * different zdir.
+   *
+   * @return true if starts with baseZnode
+   */
+  boolean isInProcedurePath(String path) {
+    return path.startsWith(baseZNode);
+  }
+
+  /**
+   * Is this the exact procedure barrier acquired znode
+   */
+  boolean isAcquiredNode(String path) {
+    return path.equals(acquiredZnode);
+  }
+
+  
+  /**
+   * Is this in the procedure barrier acquired znode path
+   */
+  boolean isAcquiredPathNode(String path) {
+    return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
+  }
+
+  /**
+   * Is this the exact procedure barrier reached znode
+   */
+  boolean isReachedNode(String path) {
+    return path.equals(reachedZnode);
+  }
+
+  /**
+   * Is this in the procedure barrier reached znode path
+   */
+  boolean isReachedPathNode(String path) {
+    return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
+  }
+
+
+  /**
+   * Is this in the procedure barrier abort znode path
+   */
+  boolean isAbortNode(String path) {
+    return path.equals(abortZnode);
+  }
+
+  /**
+   * Is this in the procedure barrier abort znode path
+   */
+  public boolean isAbortPathNode(String path) {
+    return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
+  }
+
+  // --------------------------------------------------------------------------
+  // internal debugging methods
+  // --------------------------------------------------------------------------
+  /**
+   * Recursively print the current state of ZK (non-transactional)
+   * @param root name of the root directory in zk to print
+   * @throws KeeperException
+   */
+  void logZKTree(String root) {
+    if (!LOG.isDebugEnabled()) return;
+    LOG.debug("Current zk system:");
+    String prefix = "|-";
+    LOG.debug(prefix + root);
+    try {
+      logZKTree(root, prefix);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Helper method to print the current state of the ZK tree.
+   * @see #logZKTree(String)
+   * @throws KeeperException if an unexpected exception occurs
+   */
+  protected void logZKTree(String root, String prefix) throws KeeperException {
+    List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
+    if (children == null) return;
+    for (String child : children) {
+      LOG.debug(prefix + child);
+      String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
+      logZKTree(node, prefix + "---");
+    }
+  }
+
+  public void clearChildZNodes() throws KeeperException {
+    // TODO This is potentially racy since not atomic. update when we support zk that has multi
+    LOG.info("Clearing all procedure znodes: " + acquiredZnode + " " + reachedZnode + " "
+        + abortZnode);
+
+    // If the coordinator was shutdown mid-procedure, then we are going to lose
+    // an procedure that was previously started by cleaning out all the previous state. Its much
+    // harder to figure out how to keep an procedure going and the subject of HBASE-5487.
+    ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
+    ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
+    ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
+  }
+
+  public void clearZNodes(String procedureName) throws KeeperException {
+    // TODO This is potentially racy since not atomic. update when we support zk that has multi
+    LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
+        + acquiredZnode + " " + reachedZnode + " " + abortZnode);
+    ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
+    ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
+    ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1452257&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Mon Mar  4 11:24:50 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Protobufs utility.
+ */
+@SuppressWarnings("deprecation")
+public final class ProtobufUtil {
+
+  private ProtobufUtil() {
+  }
+
+  /**
+   * Magic we put ahead of a serialized protobuf message.
+   * For example, all znode content is protobuf messages with the below magic
+   * for preamble.
+   */
+  public static final byte [] PB_MAGIC = new byte [] {'P', 'B', 'U', 'F'};
+  private static final String PB_MAGIC_STR = Bytes.toString(PB_MAGIC);
+
+  /**
+   * Prepend the passed bytes with four bytes of magic, {@link #PB_MAGIC}, to flag what
+   * follows as a protobuf in hbase.  Prepend these bytes to all content written to znodes, etc.
+   * @param bytes Bytes to decorate
+   * @return The passed <code>bytes</codes> with magic prepended (Creates a new
+   * byte array that is <code>bytes.length</code> plus {@link #PB_MAGIC}.length.
+   */
+  public static byte [] prependPBMagic(final byte [] bytes) {
+    return Bytes.add(PB_MAGIC, bytes);
+  }
+
+  /**
+   * @param bytes Bytes to check.
+   * @return True if passed <code>bytes</code> has {@link #PB_MAGIC} for a prefix.
+   */
+  public static boolean isPBMagicPrefix(final byte [] bytes) {
+    if (bytes == null || bytes.length < PB_MAGIC.length) return false;
+    return Bytes.compareTo(PB_MAGIC, 0, PB_MAGIC.length, bytes, 0, PB_MAGIC.length) == 0;
+  }
+
+  /**
+   * @return Length of {@link #PB_MAGIC}
+   */
+  public static int lengthOfPBMagic() {
+    return PB_MAGIC.length;
+  }
+}