You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/01/22 02:42:50 UTC

svn commit: r1560234 [3/3] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-common/src/main/resources/ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-s...

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto Wed Jan 22 01:42:49 2014
@@ -164,6 +164,16 @@ message SnapshotDescription {
   optional int32 version = 5;
 }
 
+/**
+ * Description of the distributed procedure to take
+ */
+message ProcedureDescription {
+  required string signature = 1; // the unique signature of the procedure
+  optional string instance = 2; // the procedure instance name
+  optional int64 creation_time = 3 [default = 0];
+  repeated NameStringPair configuration = 4;
+}
+
 message EmptyMsg {
 }
 

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto Wed Jan 22 01:42:49 2014
@@ -332,6 +332,23 @@ message IsMasterRunningResponse {
   required bool is_master_running = 1;
 }
 
+message ExecProcedureRequest {
+        required ProcedureDescription procedure = 1;
+}
+
+message ExecProcedureResponse {
+        required int64 expected_timeout = 1;
+}
+
+message IsProcedureDoneRequest {
+        optional ProcedureDescription procedure = 1;
+}
+
+message IsProcedureDoneResponse {
+        optional bool done = 1 [default = false];
+        optional ProcedureDescription snapshot = 2;
+}
+
 service MasterService {
   /** Used by the client to get the number of regions that have received the updated schema */
   rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -490,6 +507,19 @@ service MasterService {
    */
   rpc IsRestoreSnapshotDone(IsRestoreSnapshotDoneRequest) returns(IsRestoreSnapshotDoneResponse);
 
+  /**
+   * Execute a distributed procedure.
+   */
+  rpc ExecProcedure(ExecProcedureRequest) returns(ExecProcedureResponse);
+
+  /**
+   * Determine if the procedure is done yet.
+   */
+  rpc IsProcedureDone(IsProcedureDoneRequest) returns(IsProcedureDoneResponse);
+
+  /** return true if master is available */
+  /** rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); */
+
   /** Modify a namespace's metadata */
   rpc ModifyNamespace(ModifyNamespaceRequest)
     returns(ModifyNamespaceResponse);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jan 22 01:42:49 2014
@@ -106,6 +106,8 @@ import org.apache.hadoop.hbase.master.sn
 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -116,6 +118,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
@@ -193,6 +196,10 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@@ -378,6 +385,8 @@ MasterServices, Server {
 
   // monitor for snapshot of hbase tables
   private SnapshotManager snapshotManager;
+  // monitor for distributed procedures
+  private MasterProcedureManagerHost mpmHost;
 
   /** The health check chore. */
   private HealthCheckChore healthCheckChore;
@@ -635,7 +644,7 @@ MasterServices, Server {
       if (this.serverManager != null) this.serverManager.stop();
       if (this.assignmentManager != null) this.assignmentManager.stop();
       if (this.fileSystemManager != null) this.fileSystemManager.stop();
-      if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
+      if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
       this.zooKeeper.close();
     }
     LOG.info("HMaster main thread exiting");
@@ -700,8 +709,12 @@ MasterServices, Server {
         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
         ", setting cluster-up flag (Was=" + wasUp + ")");
 
-    // create the snapshot manager
-    this.snapshotManager = new SnapshotManager(this, this.metricsMaster);
+    // create/initialize the snapshot manager and other procedure managers
+    this.snapshotManager = new SnapshotManager();
+    this.mpmHost = new MasterProcedureManagerHost();
+    this.mpmHost.register(this.snapshotManager);
+    this.mpmHost.loadProcedures(conf);
+    this.mpmHost.initialize(this, this.metricsMaster);
   }
 
   /**
@@ -2164,7 +2177,7 @@ MasterServices, Server {
     }
     return info.getInfoPort();
   }
-  
+
   /**
    * @return array of coprocessor SimpleNames.
    */
@@ -2949,6 +2962,68 @@ MasterServices, Server {
     }
   }
 
+  /**
+   * Triggers an asynchronous attempt to run a distributed procedure.
+   * {@inheritDoc}
+   */
+  @Override
+  public ExecProcedureResponse execProcedure(RpcController controller,
+      ExecProcedureRequest request) throws ServiceException {
+    ProcedureDescription desc = request.getProcedure();
+    MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
+        .getSignature());
+    if (mpm == null) {
+      throw new ServiceException("The procedure is not registered: "
+          + desc.getSignature());
+    }
+
+    LOG.info(getClientIdAuditPrefix() + " procedure request for: "
+        + desc.getSignature());
+
+    try {
+      mpm.execProcedure(desc);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+
+    // send back the max amount of time the client should wait for the procedure
+    // to complete
+    long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
+    return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
+        .build();
+  }
+
+  /**
+   * Checks if the specified procedure is done.
+   * @return true if the procedure is done,
+   *   false if the procedure is in the process of completing
+   * @throws ServiceException if invalid procedure, or
+   *  a failed procedure with progress failure reason.
+   */
+  @Override
+  public IsProcedureDoneResponse isProcedureDone(RpcController controller,
+      IsProcedureDoneRequest request) throws ServiceException {
+    ProcedureDescription desc = request.getProcedure();
+    MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
+        .getSignature());
+    if (mpm == null) {
+      throw new ServiceException("The procedure is not registered: "
+          + desc.getSignature());
+    }
+    LOG.debug("Checking to see if procedure from request:"
+        + desc.getSignature() + " is done");
+
+    try {
+      IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
+          .newBuilder();
+      boolean done = mpm.isProcedureDone(desc);
+      builder.setDone(done);
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
   @Override
   public ModifyNamespaceResponse modifyNamespace(RpcController controller,
       ModifyNamespaceRequest request) throws ServiceException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java Wed Jan 22 01:42:49 2014
@@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.master.Me
 import org.apache.hadoop.hbase.master.SnapshotSentinel;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
 import org.apache.hadoop.hbase.procedure.Procedure;
 import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
 import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
 import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -86,7 +89,7 @@ import org.apache.zookeeper.KeeperExcept
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class SnapshotManager implements Stoppable {
+public class SnapshotManager extends MasterProcedureManager implements Stoppable {
   private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
 
   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
@@ -133,9 +136,9 @@ public class SnapshotManager implements 
   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
 
   private boolean stopped;
-  private final MasterServices master;  // Needed by TableEventHandlers
-  private final MetricsMaster metricsMaster;
-  private final ProcedureCoordinator coordinator;
+  private MasterServices master;  // Needed by TableEventHandlers
+  private MetricsMaster metricsMaster;
+  private ProcedureCoordinator coordinator;
 
   // Is snapshot feature enabled?
   private boolean isSnapshotSupported = false;
@@ -154,37 +157,10 @@ public class SnapshotManager implements 
   private Map<TableName, SnapshotSentinel> restoreHandlers =
       new HashMap<TableName, SnapshotSentinel>();
 
-  private final Path rootDir;
-  private final ExecutorService executorService;
+  private Path rootDir;
+  private ExecutorService executorService;
 
-  /**
-   * Construct a snapshot manager.
-   * @param master
-   */
-  public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster)
-      throws KeeperException, IOException, UnsupportedOperationException {
-    this.master = master;
-    this.metricsMaster = metricsMaster;
-
-    this.rootDir = master.getMasterFileSystem().getRootDir();
-    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
-
-    // get the configuration for the coordinator
-    Configuration conf = master.getConfiguration();
-    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
-    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
-    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
-
-    // setup the default procedure coordinator
-    String name = master.getServerName().toString();
-    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
-    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
-        master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
-
-    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
-    this.executorService = master.getExecutorService();
-    resetTempDir();
-  }
+  public SnapshotManager() {}
 
   /**
    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
@@ -1024,4 +1000,69 @@ public class SnapshotManager implements 
       }
     }
   }
+
+  @Override
+  public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
+      IOException, UnsupportedOperationException {
+    this.master = master;
+    this.metricsMaster = metricsMaster;
+
+    this.rootDir = master.getMasterFileSystem().getRootDir();
+    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+
+    // get the configuration for the coordinator
+    Configuration conf = master.getConfiguration();
+    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
+    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
+    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+        master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
+    this.executorService = master.getExecutorService();
+    resetTempDir();
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
+  }
+
+  @Override
+  public void execProcedure(ProcedureDescription desc) throws IOException {
+    takeSnapshot(toSnapshotDescription(desc));
+  }
+
+  @Override
+  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+    return isSnapshotDone(toSnapshotDescription(desc));
+  }
+
+  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
+      throws IOException {
+    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
+    if (!desc.hasInstance()) {
+      throw new IOException("Snapshot name is not defined: " + desc.toString());
+    }
+    String snapshotName = desc.getInstance();
+    List<NameStringPair> props = desc.getConfigurationList();
+    String table = null;
+    for (NameStringPair prop : props) {
+      if ("table".equalsIgnoreCase(prop.getName())) {
+        table = prop.getValue();
+      }
+    }
+    if (table == null) {
+      throw new IOException("Snapshot table is not defined: " + desc.toString());
+    }
+    TableName tableName = TableName.valueOf(table);
+    builder.setTable(tableName.getNameAsString());
+    builder.setName(snapshotName);
+    builder.setType(SnapshotDescription.Type.FLUSH);
+    return builder.build();
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java Wed Jan 22 01:42:49 2014
@@ -192,6 +192,8 @@ public abstract class TakeSnapshotHandle
       completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
       status.markComplete("Snapshot " + snapshot.getName() + " of table " + snapshotTable
           + " completed");
+      LOG.info("Snapshot " + snapshot.getName() + " of table " + snapshotTable
+          + " completed");
       metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
     } catch (Exception e) {
       status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+/**
+* A life-cycle management interface for globally barriered procedures on master.
+* See the following doc on details of globally barriered procedure:
+* https://issues.apache.org/jira/secure/attachment/12555103/121127-global-barrier-proc.pdf
+*
+* To implement a custom globally barriered procedure, user needs to extend two classes:
+* {@link MasterProcedureManager} and {@link RegionServerProcedureManager}. Implementation of
+* {@link MasterProcedureManager} is loaded into {@link HMaster} process via configuration
+* parameter 'hbase.procedure.master.classes', while implementation of
+* {@link RegionServerProcedureManager} is loaded into {@link HRegionServer} process via
+* configuration parameter 'hbase.procedure.regionserver.classes'.
+*
+* An example of globally barriered procedure implementation is {@link SnapshotManager} and
+* {@link RegionServerSnapshotManager}.
+*
+* A globally barriered procedure is identified by its signature (usually it is the name of the
+* procedure znode). During the initialization phase, the initialize methods are called by both
+* {@link HMaster} and {@link HRegionServer} witch create the procedure znode and register the
+* listeners. A procedure can be triggered by its signature and an instant name (encapsulated in
+* a {@link ProcedureDescription} object). When the servers are shutdown, the stop methods on both
+* classes are called to clean up the data associated with the procedure.
+*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MasterProcedureManager extends ProcedureManager implements
+    Stoppable {
+  /**
+   * Initialize a globally barriered procedure for master.
+   *
+   * @param master Master service interface
+   * @throws KeeperException
+   * @throws IOException
+   * @throws UnsupportedOperationException
+   */
+  public abstract void initialize(MasterServices master, MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException;
+
+  /**
+   * Execute a distributed procedure on cluster
+   *
+   * @param desc Procedure description
+   * @throws IOException
+   */
+  public abstract void execProcedure(ProcedureDescription desc) throws IOException;
+
+  /**
+   * Check if the procedure is finished successfully
+   *
+   * @param desc Procedure description
+   * @return true if the specified procedure is finished successfully
+   * @throws IOException
+   */
+  public abstract boolean isProcedureDone(ProcedureDescription desc) throws IOException;
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Hashtable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Provides the globally barriered procedure framework and environment for
+ * master oriented operations. {@link HMaster} interacts with the loaded
+ * procedure manager through this class.
+ */
+public class MasterProcedureManagerHost extends
+    ProcedureManagerHost<MasterProcedureManager> {
+
+  private Hashtable<String, MasterProcedureManager> procedureMgrMap
+      = new Hashtable<String, MasterProcedureManager>();
+
+  @Override
+  public void loadProcedures(Configuration conf) {
+    loadUserProcedures(conf, MASTER_PROCEUDRE_CONF_KEY);
+    for (MasterProcedureManager mpm : getProcedureManagers()) {
+      procedureMgrMap.put(mpm.getProcedureSignature(), mpm);
+    }
+  }
+
+  public void initialize(MasterServices master, final MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException {
+    for (MasterProcedureManager mpm : getProcedureManagers()) {
+      mpm.initialize(master, metricsMaster);
+    }
+  }
+
+  public void stop(String why) {
+    for (MasterProcedureManager mpm : getProcedureManagers()) {
+      mpm.stop(why);
+    }
+  }
+
+  public MasterProcedureManager getProcedureManager(String signature) {
+    return procedureMgrMap.get(signature);
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ProcedureManager {
+
+  /**
+   * Return the unique signature of the procedure. This signature uniquely
+   * identifies the procedure. By default, this signature is the string used in
+   * the procedure controller (i.e., the root ZK node name for the procedure)
+   */
+  public abstract String getProcedureSignature();
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ProcedureManager)) {
+      return false;
+    }
+    ProcedureManager other = (ProcedureManager)obj;
+    return this.getProcedureSignature().equals(other.getProcedureSignature());
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getProcedureSignature().hashCode();
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Provides the common setup framework and runtime services for globally
+ * barriered procedure invocation from HBase services.
+ * @param <E> the specific procedure management extension that a concrete
+ * implementation provides
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ProcedureManagerHost<E extends ProcedureManager> {
+
+  public static final String REGIONSERVER_PROCEDURE_CONF_KEY =
+      "hbase.procedure.regionserver.classes";
+  public static final String MASTER_PROCEUDRE_CONF_KEY =
+      "hbase.procedure.master.classes";
+
+  private static final Log LOG = LogFactory.getLog(ProcedureManagerHost.class);
+
+  protected Set<E> procedures = new HashSet<E>();
+
+  /**
+   * Load system procedures. Read the class names from configuration.
+   * Called by constructor.
+   */
+  protected void loadUserProcedures(Configuration conf, String confKey) {
+    Class<?> implClass = null;
+
+    // load default procedures from configure file
+    String[] defaultProcClasses = conf.getStrings(confKey);
+    if (defaultProcClasses == null || defaultProcClasses.length == 0)
+      return;
+
+    List<E> configured = new ArrayList<E>();
+    for (String className : defaultProcClasses) {
+      className = className.trim();
+      ClassLoader cl = this.getClass().getClassLoader();
+      Thread.currentThread().setContextClassLoader(cl);
+      try {
+        implClass = cl.loadClass(className);
+        configured.add(loadInstance(implClass));
+        LOG.info("User procedure " + className + " was loaded successfully.");
+      } catch (ClassNotFoundException e) {
+        LOG.warn("Class " + className + " cannot be found. " +
+            e.getMessage());
+      } catch (IOException e) {
+        LOG.warn("Load procedure " + className + " failed. " +
+            e.getMessage());
+      }
+    }
+
+    // add entire set to the collection
+    procedures.addAll(configured);
+  }
+
+  @SuppressWarnings("unchecked")
+  public E loadInstance(Class<?> implClass) throws IOException {
+    // create the instance
+    E impl;
+    Object o = null;
+    try {
+      o = implClass.newInstance();
+      impl = (E)o;
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+
+    return impl;
+  }
+
+  // Register a procedure manager object
+  public void register(E obj) {
+    procedures.add(obj);
+  }
+
+  public Set<E> getProcedureManagers() {
+    Set<E> returnValue = new HashSet<E>();
+    for (E e: procedures) {
+      returnValue.add(e);
+    }
+    return returnValue;
+  }
+
+  public abstract void loadProcedures(Configuration conf);
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A life-cycle management interface for globally barriered procedures on
+ * region servers.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RegionServerProcedureManager extends ProcedureManager {
+  /**
+   * Initialize a globally barriered procedure for region servers.
+   *
+   * @param rss Region Server service interface
+   * @throws KeeperException
+   */
+  public abstract void initialize(RegionServerServices rss) throws KeeperException;
+
+  /**
+   * Start accepting procedure requests.
+   */
+  public abstract void start();
+
+  /**
+   * Close <tt>this</tt> and all running procedure tasks
+   *
+   * @param force forcefully stop all running tasks
+   * @throws IOException
+   */
+  public abstract void stop(boolean force) throws IOException;
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Provides the globally barriered procedure framework and environment
+ * for region server oriented operations.  {@link HRegionServer} interacts
+ * with the loaded procedure manager through this class.
+ */
+public class RegionServerProcedureManagerHost extends
+    ProcedureManagerHost<RegionServerProcedureManager> {
+
+  private static final Log LOG = LogFactory
+      .getLog(RegionServerProcedureManagerHost.class);
+
+  public void initialize(RegionServerServices rss) throws KeeperException {
+    for (RegionServerProcedureManager proc : procedures) {
+      LOG.info("Procedure " + proc.getProcedureSignature() + " is initializing");
+      proc.initialize(rss);
+      LOG.info("Procedure " + proc.getProcedureSignature() + " is initialized");
+    }
+  }
+
+  public void start() {
+    for (RegionServerProcedureManager proc : procedures) {
+      LOG.info("Procedure " + proc.getProcedureSignature() + " is starting");
+      proc.start();
+      LOG.info("Procedure " + proc.getProcedureSignature() + " is started");
+    }
+  }
+
+  public void stop(boolean force) {
+    for (RegionServerProcedureManager proc : procedures) {
+      try {
+        proc.stop(force);
+      } catch (IOException e) {
+        LOG.warn("Failed to close procedure " + proc.getProcedureSignature()
+            + " cleanly", e);
+      }
+    }
+  }
+
+  @Override
+  public void loadProcedures(Configuration conf) {
+    loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY);
+    // load the default snapshot manager
+    procedures.add(new RegionServerSnapshotManager());
+  }
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jan 22 01:42:49 2014
@@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.ipc.Serve
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -489,8 +490,7 @@ public class HRegionServer implements Cl
 
   private RegionServerCoprocessorHost rsHost;
 
-  /** Handle all the snapshot requests to this server */
-  RegionServerSnapshotManager snapshotManager;
+  private RegionServerProcedureManagerHost rspmHost;
 
   // configuration setting on if replay WAL edits directly to another RS
   private final boolean distributedLogReplay;
@@ -749,11 +749,13 @@ public class HRegionServer implements Cl
       this.abort("Failed to retrieve Cluster ID",e);
     }
 
-    // watch for snapshots
+    // watch for snapshots and other procedures
     try {
-      this.snapshotManager = new RegionServerSnapshotManager(this);
+      rspmHost = new RegionServerProcedureManagerHost();
+      rspmHost.loadProcedures(conf);
+      rspmHost.initialize(this);
     } catch (KeeperException e) {
-      this.abort("Failed to reach zk cluster when creating snapshot handler.");
+      this.abort("Failed to reach zk cluster when creating procedure handler.", e);
     }
     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
         ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
@@ -856,8 +858,9 @@ public class HRegionServer implements Cl
       }
 
       if (!this.stopped && isHealthy()){
-        // start the snapshot handler, since the server is ready to run
-        this.snapshotManager.start();
+        // start the snapshot handler and other procedure handlers,
+        // since the server is ready to run
+        rspmHost.start();
       }
 
       // We registered with the Master.  Go into run mode.
@@ -945,12 +948,8 @@ public class HRegionServer implements Cl
       this.nonceManagerChore.interrupt();
     }
 
-    // Stop the snapshot handler, forcefully killing all running tasks
-    try {
-      if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
-    } catch (IOException e) {
-      LOG.warn("Failed to close snapshot handler cleanly", e);
-    }
+    // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
+    rspmHost.stop(this.abortRequested || this.killed);
 
     if (this.killed) {
       // Just skip out w/o closing regions.  Used when testing.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java Wed Jan 22 01:42:49 2014
@@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.master.sn
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
 import org.apache.hadoop.hbase.procedure.Subprocedure;
 import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
 import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtoc
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class RegionServerSnapshotManager {
+public class RegionServerSnapshotManager extends RegionServerProcedureManager {
   private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
 
   /** Maximum number of snapshot region tasks that can run concurrently */
@@ -93,9 +93,9 @@ public class RegionServerSnapshotManager
   /** Default amount of time to check for errors while regions finish snapshotting */
   private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
 
-  private final RegionServerServices rss;
-  private final ProcedureMemberRpcs memberRpcs;
-  private final ProcedureMember member;
+  private RegionServerServices rss;
+  private ProcedureMemberRpcs memberRpcs;
+  private ProcedureMember member;
 
   /**
    * Exposed for testing.
@@ -111,32 +111,12 @@ public class RegionServerSnapshotManager
     this.member = procMember;
   }
 
-  /**
-   * Create a default snapshot handler - uses a zookeeper based member controller.
-   * @param rss region server running the handler
-   * @throws KeeperException if the zookeeper cluster cannot be reached
-   */
-  public RegionServerSnapshotManager(RegionServerServices rss)
-      throws KeeperException {
-    this.rss = rss;
-    ZooKeeperWatcher zkw = rss.getZooKeeper();
-    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
-        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
-
-    // read in the snapshot request configuration properties
-    Configuration conf = rss.getConfiguration();
-    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
-    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
-
-    // create the actual snapshot procedure member
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
-      opThreads, keepAlive);
-    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
-  }
+  public RegionServerSnapshotManager() {}
 
   /**
    * Start accepting snapshot requests.
    */
+  @Override
   public void start() {
     LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
     this.memberRpcs.start(rss.getServerName().toString(), member);
@@ -147,6 +127,7 @@ public class RegionServerSnapshotManager
    * @param force forcefully stop all running tasks
    * @throws IOException
    */
+  @Override
   public void stop(boolean force) throws IOException {
     String mode = force ? "abruptly" : "gracefully";
     LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
@@ -373,4 +354,33 @@ public class RegionServerSnapshotManager
       this.executor.shutdownNow();
     }
   }
+
+  /**
+   * Create a default snapshot handler - uses a zookeeper based member controller.
+   * @param rss region server running the handler
+   * @throws KeeperException if the zookeeper cluster cannot be reached
+   */
+  @Override
+  public void initialize(RegionServerServices rss) throws KeeperException {
+    this.rss = rss;
+    ZooKeeperWatcher zkw = rss.getZooKeeper();
+    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
+        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
+
+    // read in the snapshot request configuration properties
+    Configuration conf = rss.getConfiguration();
+    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
+
+    // create the actual snapshot procedure member
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
+      opThreads, keepAlive);
+    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
+  }
+
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
+import org.apache.zookeeper.KeeperException;
+
+public class SimpleMasterProcedureManager extends MasterProcedureManager {
+
+  public static final String SIMPLE_SIGNATURE = "simle_test";
+
+  private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
+
+  private MasterServices master;
+  private ProcedureCoordinator coordinator;
+  private ExecutorService executorService;
+
+  private boolean done;
+
+  @Override
+  public void stop(String why) {
+    LOG.info("stop: " + why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public void initialize(MasterServices master, MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException {
+    this.master = master;
+    this.done = false;
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+        master.getZooKeeper(), getProcedureSignature(), name);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool);
+    this.executorService = master.getExecutorService();
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return SIMPLE_SIGNATURE;
+  }
+
+  @Override
+  public void execProcedure(ProcedureDescription desc) throws IOException {
+    this.done = false;
+    // start the process on the RS
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+
+    List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+    List<String> servers = new ArrayList<String>();
+    for (ServerName sn : serverNames) {
+      servers.add(sn.toString());
+    }
+    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
+    if (proc == null) {
+      String msg = "Failed to submit distributed procedure for '"
+          + getProcedureSignature() + "'";
+      LOG.error(msg);
+      throw new HBaseSnapshotException(msg);
+    }
+
+    try {
+      // wait for the procedure to complete.  A timer thread is kicked off that should cancel this
+      // if it takes too long.
+      proc.waitForCompleted();
+      LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+      this.done = true;
+    } catch (InterruptedException e) {
+      ForeignException ee =
+          new ForeignException("Interrupted while waiting for procdure to finish", e);
+      monitor.receive(ee);
+      Thread.currentThread().interrupt();
+    } catch (ForeignException e) {
+      monitor.receive(e);
+    }
+  }
+
+  @Override
+  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+    return done;
+  }
+
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class SimpleRSProcedureManager extends RegionServerProcedureManager {
+
+  private static final Log LOG = LogFactory.getLog(SimpleRSProcedureManager.class);
+
+  private RegionServerServices rss;
+  private ProcedureMemberRpcs memberRpcs;
+  private ProcedureMember member;
+
+  @Override
+  public void initialize(RegionServerServices rss) throws KeeperException {
+    this.rss = rss;
+    ZooKeeperWatcher zkw = rss.getZooKeeper();
+    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
+
+    ThreadPoolExecutor pool =
+        ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
+    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
+    LOG.info("Initialized: " + rss.getServerName().toString());
+  }
+
+  @Override
+  public void start() {
+    this.memberRpcs.start(rss.getServerName().toString(), member);
+    LOG.info("Started.");
+  }
+
+  @Override
+  public void stop(boolean force) throws IOException {
+    LOG.info("stop: " + force);
+    try {
+      this.member.close();
+    } finally {
+      this.memberRpcs.close();
+    }
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
+  }
+
+  /**
+   * If in a running state, creates the specified subprocedure for handling a procedure.
+   * @return Subprocedure to submit to the ProcedureMemeber.
+   */
+  public Subprocedure buildSubprocedure(String name) {
+
+    // don't run a procedure if the parent is stop(ping)
+    if (rss.isStopping() || rss.isStopped()) {
+      throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
+          + ", because stopping/stopped!");
+    }
+
+    LOG.info("Attempting to run a procedure.");
+    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+    Configuration conf = rss.getConfiguration();
+
+    SimpleSubprocedurePool taskManager =
+        new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
+    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
+  }
+
+  /**
+   * Build the actual procedure runner that will do all the 'hard' work
+   */
+  public class SimleSubprocedureBuilder implements SubprocedureFactory {
+
+    @Override
+    public Subprocedure buildSubprocedure(String name, byte[] data) {
+      LOG.info("Building procedure: " + name);
+      return SimpleRSProcedureManager.this.buildSubprocedure(name);
+    }
+  }
+
+  public class SimpleSubprocedurePool implements Closeable, Abortable {
+
+    private final ExecutorCompletionService<Void> taskPool;
+    private final ThreadPoolExecutor executor;
+    private volatile boolean aborted;
+    private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+    private final String name;
+
+    public SimpleSubprocedurePool(String name, Configuration conf) {
+      this.name = name;
+      executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>(),
+          new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
+      taskPool = new ExecutorCompletionService<Void>(executor);
+    }
+
+    /**
+     * Submit a task to the pool.
+     */
+    public void submitTask(final Callable<Void> task) {
+      Future<Void> f = this.taskPool.submit(task);
+      futures.add(f);
+    }
+
+    /**
+     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+     *
+     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+     * @throws ForeignException
+     */
+    public boolean waitForOutstandingTasks() throws ForeignException {
+      LOG.debug("Waiting for procedure to finish.");
+
+      try {
+        for (Future<Void> f: futures) {
+          f.get();
+        }
+        return true;
+      } catch (InterruptedException e) {
+        if (aborted) throw new ForeignException(
+            "Interrupted and found to be aborted while waiting for tasks!", e);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof ForeignException) {
+          throw (ForeignException) e.getCause();
+        }
+        throw new ForeignException(name, e.getCause());
+      } finally {
+        // close off remaining tasks
+        for (Future<Void> f: futures) {
+          if (!f.isDone()) {
+            f.cancel(true);
+          }
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+     * finish
+     */
+    @Override
+    public void close() {
+      executor.shutdown();
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+      if (this.aborted) return;
+
+      this.aborted = true;
+      LOG.warn("Aborting because: " + why, e);
+      this.executor.shutdownNow();
+    }
+
+    @Override
+    public boolean isAborted() {
+      return this.aborted;
+    }
+  }
+
+  public class SimpleSubprocedure extends Subprocedure {
+    private final RegionServerServices rss;
+    private final SimpleSubprocedurePool taskManager;
+
+    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
+        ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
+      super(member, name, errorListener, 500, 60000);
+      LOG.info("Constructing a SimpleSubprocedure.");
+      this.rss = rss;
+      this.taskManager = taskManager;
+    }
+
+    /**
+     * Callable task.
+     * TODO. We don't need a thread pool to execute roll log. This can be simplified
+     * with no use of subprocedurepool.
+     */
+    class RSSimpleTask implements Callable<Void> {
+      RSSimpleTask() {}
+
+      @Override
+      public Void call() throws Exception {
+        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
+        return null;
+      }
+
+    }
+
+    private void execute() throws ForeignException {
+
+      monitor.rethrowException();
+
+      // running a task (e.g., roll log, flush table) on region server
+      taskManager.submitTask(new RSSimpleTask());
+      monitor.rethrowException();
+
+      // wait for everything to complete.
+      taskManager.waitForOutstandingTasks();
+      monitor.rethrowException();
+
+    }
+
+    @Override
+    public void acquireBarrier() throws ForeignException {
+      // do nothing, executing in inside barrier step.
+    }
+
+    /**
+     * do a log roll.
+     */
+    @Override
+    public void insideBarrier() throws ForeignException {
+      execute();
+    }
+
+    /**
+     * Cancel threads if they haven't finished.
+     */
+    @Override
+    public void cleanup(Exception e) {
+      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
+    }
+  }
+
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestProcedureManager {
+
+  static final Log LOG = LogFactory.getLog(TestProcedureManager.class);
+  private static final int NUM_RS = 2;
+  private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // set configure to indicate which pm should be loaded
+    Configuration conf = util.getConfiguration();
+
+    conf.set(ProcedureManagerHost.MASTER_PROCEUDRE_CONF_KEY,
+        SimpleMasterProcedureManager.class.getName());
+    conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
+        SimpleRSProcedureManager.class.getName());
+
+    util.startMiniCluster(NUM_RS);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testSimpleProcedureManager() throws IOException {
+    HBaseAdmin admin = util.getHBaseAdmin();
+
+    admin.execProcedure(SimpleMasterProcedureManager.SIMPLE_SIGNATURE,
+        "mytest", new HashMap<String, String>());
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java Wed Jan 22 01:42:49 2014
@@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
@@ -185,6 +187,60 @@ public class TestFlushSnapshotFromClient
         admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
   }
 
+
+  /**
+   * Test simple flush snapshotting a table that is online
+   * @throws Exception
+   */
+  @Test (timeout=300000)
+  public void testFlushTableSnapshotWithProcedure() throws Exception {
+    HBaseAdmin admin = UTIL.getHBaseAdmin();
+    // make sure we don't fail on listing snapshots
+    SnapshotTestingUtils.assertNoSnapshots(admin);
+
+    // put some stuff in the table
+    HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
+    SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
+
+    // get the name of all the regionservers hosting the snapshotted table
+    Set<String> snapshotServers = new HashSet<String>();
+    List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
+    for (RegionServerThread server : servers) {
+      if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
+        snapshotServers.add(server.getRegionServer().getServerName().toString());
+      }
+    }
+
+    LOG.debug("FS state before snapshot:");
+    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
+        FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
+
+    // take a snapshot of the enabled table
+    String snapshotString = "offlineTableSnapshot";
+    byte[] snapshot = Bytes.toBytes(snapshotString);
+    Map<String, String> props = new HashMap<String, String>();
+    props.put("table", STRING_TABLE_NAME);
+    admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION,
+        snapshotString, props);
+
+
+    LOG.debug("Snapshot completed.");
+
+    // make sure we have the snapshot
+    List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
+      snapshot, TABLE_NAME);
+
+    // make sure its a valid snapshot
+    FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    LOG.debug("FS state after snapshot:");
+    FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
+        FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
+
+    SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
+        admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
+  }
+
   @Test (timeout=300000)
   public void testSnapshotFailsOnNonExistantTable() throws Exception {
     HBaseAdmin admin = UTIL.getHBaseAdmin();