You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/01/22 02:42:50 UTC
svn commit: r1560234 [3/3] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-common/src/main/resources/
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/ hbase-s...
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/HBase.proto Wed Jan 22 01:42:49 2014
@@ -164,6 +164,16 @@ message SnapshotDescription {
optional int32 version = 5;
}
+/**
+ * Description of the distributed procedure to take
+ */
+message ProcedureDescription {
+ required string signature = 1; // the unique signature of the procedure
+ optional string instance = 2; // the procedure instance name
+ optional int64 creation_time = 3 [default = 0];
+ repeated NameStringPair configuration = 4;
+}
+
message EmptyMsg {
}
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Master.proto Wed Jan 22 01:42:49 2014
@@ -332,6 +332,23 @@ message IsMasterRunningResponse {
required bool is_master_running = 1;
}
+message ExecProcedureRequest {
+ required ProcedureDescription procedure = 1;
+}
+
+message ExecProcedureResponse {
+ required int64 expected_timeout = 1;
+}
+
+message IsProcedureDoneRequest {
+ optional ProcedureDescription procedure = 1;
+}
+
+message IsProcedureDoneResponse {
+ optional bool done = 1 [default = false];
+ optional ProcedureDescription snapshot = 2;
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -490,6 +507,19 @@ service MasterService {
*/
rpc IsRestoreSnapshotDone(IsRestoreSnapshotDoneRequest) returns(IsRestoreSnapshotDoneResponse);
+ /**
+ * Execute a distributed procedure.
+ */
+ rpc ExecProcedure(ExecProcedureRequest) returns(ExecProcedureResponse);
+
+ /**
+ * Determine if the procedure is done yet.
+ */
+ rpc IsProcedureDone(IsProcedureDoneRequest) returns(IsProcedureDoneResponse);
+
+ /** return true if master is available */
+ /** rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); */
+
/** Modify a namespace's metadata */
rpc ModifyNamespace(ModifyNamespaceRequest)
returns(ModifyNamespaceResponse);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jan 22 01:42:49 2014
@@ -106,6 +106,8 @@ import org.apache.hadoop.hbase.master.sn
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -116,6 +118,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
@@ -193,6 +196,10 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@@ -378,6 +385,8 @@ MasterServices, Server {
// monitor for snapshot of hbase tables
private SnapshotManager snapshotManager;
+ // monitor for distributed procedures
+ private MasterProcedureManagerHost mpmHost;
/** The health check chore. */
private HealthCheckChore healthCheckChore;
@@ -635,7 +644,7 @@ MasterServices, Server {
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
- if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
+ if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
this.zooKeeper.close();
}
LOG.info("HMaster main thread exiting");
@@ -700,8 +709,12 @@ MasterServices, Server {
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
", setting cluster-up flag (Was=" + wasUp + ")");
- // create the snapshot manager
- this.snapshotManager = new SnapshotManager(this, this.metricsMaster);
+ // create/initialize the snapshot manager and other procedure managers
+ this.snapshotManager = new SnapshotManager();
+ this.mpmHost = new MasterProcedureManagerHost();
+ this.mpmHost.register(this.snapshotManager);
+ this.mpmHost.loadProcedures(conf);
+ this.mpmHost.initialize(this, this.metricsMaster);
}
/**
@@ -2164,7 +2177,7 @@ MasterServices, Server {
}
return info.getInfoPort();
}
-
+
/**
* @return array of coprocessor SimpleNames.
*/
@@ -2949,6 +2962,68 @@ MasterServices, Server {
}
}
+ /**
+ * Triggers an asynchronous attempt to run a distributed procedure.
+ * {@inheritDoc}
+ */
+ @Override
+ public ExecProcedureResponse execProcedure(RpcController controller,
+ ExecProcedureRequest request) throws ServiceException {
+ ProcedureDescription desc = request.getProcedure();
+ MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
+ .getSignature());
+ if (mpm == null) {
+ throw new ServiceException("The procedure is not registered: "
+ + desc.getSignature());
+ }
+
+ LOG.info(getClientIdAuditPrefix() + " procedure request for: "
+ + desc.getSignature());
+
+ try {
+ mpm.execProcedure(desc);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+
+ // send back the max amount of time the client should wait for the procedure
+ // to complete
+ long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
+ return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
+ .build();
+ }
+
+ /**
+ * Checks if the specified procedure is done.
+ * @return true if the procedure is done,
+ * false if the procedure is in the process of completing
+ * @throws ServiceException if invalid procedure, or
+ * a failed procedure with progress failure reason.
+ */
+ @Override
+ public IsProcedureDoneResponse isProcedureDone(RpcController controller,
+ IsProcedureDoneRequest request) throws ServiceException {
+ ProcedureDescription desc = request.getProcedure();
+ MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
+ .getSignature());
+ if (mpm == null) {
+ throw new ServiceException("The procedure is not registered: "
+ + desc.getSignature());
+ }
+ LOG.debug("Checking to see if procedure from request:"
+ + desc.getSignature() + " is done");
+
+ try {
+ IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
+ .newBuilder();
+ boolean done = mpm.isProcedureDone(desc);
+ builder.setDone(done);
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
@Override
public ModifyNamespaceResponse modifyNamespace(RpcController controller,
ModifyNamespaceRequest request) throws ServiceException {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java Wed Jan 22 01:42:49 2014
@@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.master.Me
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@@ -86,7 +89,7 @@ import org.apache.zookeeper.KeeperExcept
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class SnapshotManager implements Stoppable {
+public class SnapshotManager extends MasterProcedureManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
@@ -133,9 +136,9 @@ public class SnapshotManager implements
private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
private boolean stopped;
- private final MasterServices master; // Needed by TableEventHandlers
- private final MetricsMaster metricsMaster;
- private final ProcedureCoordinator coordinator;
+ private MasterServices master; // Needed by TableEventHandlers
+ private MetricsMaster metricsMaster;
+ private ProcedureCoordinator coordinator;
// Is snapshot feature enabled?
private boolean isSnapshotSupported = false;
@@ -154,37 +157,10 @@ public class SnapshotManager implements
private Map<TableName, SnapshotSentinel> restoreHandlers =
new HashMap<TableName, SnapshotSentinel>();
- private final Path rootDir;
- private final ExecutorService executorService;
+ private Path rootDir;
+ private ExecutorService executorService;
- /**
- * Construct a snapshot manager.
- * @param master
- */
- public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster)
- throws KeeperException, IOException, UnsupportedOperationException {
- this.master = master;
- this.metricsMaster = metricsMaster;
-
- this.rootDir = master.getMasterFileSystem().getRootDir();
- checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
-
- // get the configuration for the coordinator
- Configuration conf = master.getConfiguration();
- long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
- long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
- int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
-
- // setup the default procedure coordinator
- String name = master.getServerName().toString();
- ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
- ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
- master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
-
- this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
- this.executorService = master.getExecutorService();
- resetTempDir();
- }
+ public SnapshotManager() {}
/**
* Fully specify all necessary components of a snapshot manager. Exposed for testing.
@@ -1024,4 +1000,69 @@ public class SnapshotManager implements
}
}
}
+
+ @Override
+ public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
+ IOException, UnsupportedOperationException {
+ this.master = master;
+ this.metricsMaster = metricsMaster;
+
+ this.rootDir = master.getMasterFileSystem().getRootDir();
+ checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
+
+ // get the configuration for the coordinator
+ Configuration conf = master.getConfiguration();
+ long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
+ long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
+
+ // setup the default procedure coordinator
+ String name = master.getServerName().toString();
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
+ ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+ master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
+
+ this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
+ this.executorService = master.getExecutorService();
+ resetTempDir();
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
+ }
+
+ @Override
+ public void execProcedure(ProcedureDescription desc) throws IOException {
+ takeSnapshot(toSnapshotDescription(desc));
+ }
+
+ @Override
+ public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+ return isSnapshotDone(toSnapshotDescription(desc));
+ }
+
+ private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
+ throws IOException {
+ SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
+ if (!desc.hasInstance()) {
+ throw new IOException("Snapshot name is not defined: " + desc.toString());
+ }
+ String snapshotName = desc.getInstance();
+ List<NameStringPair> props = desc.getConfigurationList();
+ String table = null;
+ for (NameStringPair prop : props) {
+ if ("table".equalsIgnoreCase(prop.getName())) {
+ table = prop.getValue();
+ }
+ }
+ if (table == null) {
+ throw new IOException("Snapshot table is not defined: " + desc.toString());
+ }
+ TableName tableName = TableName.valueOf(table);
+ builder.setTable(tableName.getNameAsString());
+ builder.setName(snapshotName);
+ builder.setType(SnapshotDescription.Type.FLUSH);
+ return builder.build();
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java Wed Jan 22 01:42:49 2014
@@ -192,6 +192,8 @@ public abstract class TakeSnapshotHandle
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
status.markComplete("Snapshot " + snapshot.getName() + " of table " + snapshotTable
+ " completed");
+ LOG.info("Snapshot " + snapshot.getName() + " of table " + snapshotTable
+ + " completed");
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
} catch (Exception e) {
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+/**
+* A life-cycle management interface for globally barriered procedures on master.
+* See the following doc on details of globally barriered procedure:
+* https://issues.apache.org/jira/secure/attachment/12555103/121127-global-barrier-proc.pdf
+*
+* To implement a custom globally barriered procedure, user needs to extend two classes:
+* {@link MasterProcedureManager} and {@link RegionServerProcedureManager}. Implementation of
+* {@link MasterProcedureManager} is loaded into {@link HMaster} process via configuration
+* parameter 'hbase.procedure.master.classes', while implementation of
+* {@link RegionServerProcedureManager} is loaded into {@link HRegionServer} process via
+* configuration parameter 'hbase.procedure.regionserver.classes'.
+*
+* An example of globally barriered procedure implementation is {@link SnapshotManager} and
+* {@link RegionServerSnapshotManager}.
+*
+* A globally barriered procedure is identified by its signature (usually it is the name of the
+* procedure znode). During the initialization phase, the initialize methods are called by both
+* {@link HMaster} and {@link HRegionServer} witch create the procedure znode and register the
+* listeners. A procedure can be triggered by its signature and an instant name (encapsulated in
+* a {@link ProcedureDescription} object). When the servers are shutdown, the stop methods on both
+* classes are called to clean up the data associated with the procedure.
+*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MasterProcedureManager extends ProcedureManager implements
+ Stoppable {
+ /**
+ * Initialize a globally barriered procedure for master.
+ *
+ * @param master Master service interface
+ * @throws KeeperException
+ * @throws IOException
+ * @throws UnsupportedOperationException
+ */
+ public abstract void initialize(MasterServices master, MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException;
+
+ /**
+ * Execute a distributed procedure on cluster
+ *
+ * @param desc Procedure description
+ * @throws IOException
+ */
+ public abstract void execProcedure(ProcedureDescription desc) throws IOException;
+
+ /**
+ * Check if the procedure is finished successfully
+ *
+ * @param desc Procedure description
+ * @return true if the specified procedure is finished successfully
+ * @throws IOException
+ */
+ public abstract boolean isProcedureDone(ProcedureDescription desc) throws IOException;
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/MasterProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.Hashtable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Provides the globally barriered procedure framework and environment for
+ * master oriented operations. {@link HMaster} interacts with the loaded
+ * procedure manager through this class.
+ */
+public class MasterProcedureManagerHost extends
+ ProcedureManagerHost<MasterProcedureManager> {
+
+ private Hashtable<String, MasterProcedureManager> procedureMgrMap
+ = new Hashtable<String, MasterProcedureManager>();
+
+ @Override
+ public void loadProcedures(Configuration conf) {
+ loadUserProcedures(conf, MASTER_PROCEUDRE_CONF_KEY);
+ for (MasterProcedureManager mpm : getProcedureManagers()) {
+ procedureMgrMap.put(mpm.getProcedureSignature(), mpm);
+ }
+ }
+
+ public void initialize(MasterServices master, final MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException {
+ for (MasterProcedureManager mpm : getProcedureManagers()) {
+ mpm.initialize(master, metricsMaster);
+ }
+ }
+
+ public void stop(String why) {
+ for (MasterProcedureManager mpm : getProcedureManagers()) {
+ mpm.stop(why);
+ }
+ }
+
+ public MasterProcedureManager getProcedureManager(String signature) {
+ return procedureMgrMap.get(signature);
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ProcedureManager {
+
+ /**
+ * Return the unique signature of the procedure. This signature uniquely
+ * identifies the procedure. By default, this signature is the string used in
+ * the procedure controller (i.e., the root ZK node name for the procedure)
+ */
+ public abstract String getProcedureSignature();
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ProcedureManager)) {
+ return false;
+ }
+ ProcedureManager other = (ProcedureManager)obj;
+ return this.getProcedureSignature().equals(other.getProcedureSignature());
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getProcedureSignature().hashCode();
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Provides the common setup framework and runtime services for globally
+ * barriered procedure invocation from HBase services.
+ * @param <E> the specific procedure management extension that a concrete
+ * implementation provides
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ProcedureManagerHost<E extends ProcedureManager> {
+
+ public static final String REGIONSERVER_PROCEDURE_CONF_KEY =
+ "hbase.procedure.regionserver.classes";
+ public static final String MASTER_PROCEUDRE_CONF_KEY =
+ "hbase.procedure.master.classes";
+
+ private static final Log LOG = LogFactory.getLog(ProcedureManagerHost.class);
+
+ protected Set<E> procedures = new HashSet<E>();
+
+ /**
+ * Load system procedures. Read the class names from configuration.
+ * Called by constructor.
+ */
+ protected void loadUserProcedures(Configuration conf, String confKey) {
+ Class<?> implClass = null;
+
+ // load default procedures from configure file
+ String[] defaultProcClasses = conf.getStrings(confKey);
+ if (defaultProcClasses == null || defaultProcClasses.length == 0)
+ return;
+
+ List<E> configured = new ArrayList<E>();
+ for (String className : defaultProcClasses) {
+ className = className.trim();
+ ClassLoader cl = this.getClass().getClassLoader();
+ Thread.currentThread().setContextClassLoader(cl);
+ try {
+ implClass = cl.loadClass(className);
+ configured.add(loadInstance(implClass));
+ LOG.info("User procedure " + className + " was loaded successfully.");
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Class " + className + " cannot be found. " +
+ e.getMessage());
+ } catch (IOException e) {
+ LOG.warn("Load procedure " + className + " failed. " +
+ e.getMessage());
+ }
+ }
+
+ // add entire set to the collection
+ procedures.addAll(configured);
+ }
+
+ @SuppressWarnings("unchecked")
+ public E loadInstance(Class<?> implClass) throws IOException {
+ // create the instance
+ E impl;
+ Object o = null;
+ try {
+ o = implClass.newInstance();
+ impl = (E)o;
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+
+ return impl;
+ }
+
+ // Register a procedure manager object
+ public void register(E obj) {
+ procedures.add(obj);
+ }
+
+ public Set<E> getProcedureManagers() {
+ Set<E> returnValue = new HashSet<E>();
+ for (E e: procedures) {
+ returnValue.add(e);
+ }
+ return returnValue;
+ }
+
+ public abstract void loadProcedures(Configuration conf);
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A life-cycle management interface for globally barriered procedures on
+ * region servers.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class RegionServerProcedureManager extends ProcedureManager {
+ /**
+ * Initialize a globally barriered procedure for region servers.
+ *
+ * @param rss Region Server service interface
+ * @throws KeeperException
+ */
+ public abstract void initialize(RegionServerServices rss) throws KeeperException;
+
+ /**
+ * Start accepting procedure requests.
+ */
+ public abstract void start();
+
+ /**
+ * Close <tt>this</tt> and all running procedure tasks
+ *
+ * @param force forcefully stop all running tasks
+ * @throws IOException
+ */
+ public abstract void stop(boolean force) throws IOException;
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Provides the globally barriered procedure framework and environment
+ * for region server oriented operations. {@link HRegionServer} interacts
+ * with the loaded procedure manager through this class.
+ */
+public class RegionServerProcedureManagerHost extends
+ ProcedureManagerHost<RegionServerProcedureManager> {
+
+ private static final Log LOG = LogFactory
+ .getLog(RegionServerProcedureManagerHost.class);
+
+ public void initialize(RegionServerServices rss) throws KeeperException {
+ for (RegionServerProcedureManager proc : procedures) {
+ LOG.info("Procedure " + proc.getProcedureSignature() + " is initializing");
+ proc.initialize(rss);
+ LOG.info("Procedure " + proc.getProcedureSignature() + " is initialized");
+ }
+ }
+
+ public void start() {
+ for (RegionServerProcedureManager proc : procedures) {
+ LOG.info("Procedure " + proc.getProcedureSignature() + " is starting");
+ proc.start();
+ LOG.info("Procedure " + proc.getProcedureSignature() + " is started");
+ }
+ }
+
+ public void stop(boolean force) {
+ for (RegionServerProcedureManager proc : procedures) {
+ try {
+ proc.stop(force);
+ } catch (IOException e) {
+ LOG.warn("Failed to close procedure " + proc.getProcedureSignature()
+ + " cleanly", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadProcedures(Configuration conf) {
+ loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY);
+ // load the default snapshot manager
+ procedures.add(new RegionServerSnapshotManager());
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jan 22 01:42:49 2014
@@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.ipc.Serve
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -489,8 +490,7 @@ public class HRegionServer implements Cl
private RegionServerCoprocessorHost rsHost;
- /** Handle all the snapshot requests to this server */
- RegionServerSnapshotManager snapshotManager;
+ private RegionServerProcedureManagerHost rspmHost;
// configuration setting on if replay WAL edits directly to another RS
private final boolean distributedLogReplay;
@@ -749,11 +749,13 @@ public class HRegionServer implements Cl
this.abort("Failed to retrieve Cluster ID",e);
}
- // watch for snapshots
+ // watch for snapshots and other procedures
try {
- this.snapshotManager = new RegionServerSnapshotManager(this);
+ rspmHost = new RegionServerProcedureManagerHost();
+ rspmHost.loadProcedures(conf);
+ rspmHost.initialize(this);
} catch (KeeperException e) {
- this.abort("Failed to reach zk cluster when creating snapshot handler.");
+ this.abort("Failed to reach zk cluster when creating procedure handler.", e);
}
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
@@ -856,8 +858,9 @@ public class HRegionServer implements Cl
}
if (!this.stopped && isHealthy()){
- // start the snapshot handler, since the server is ready to run
- this.snapshotManager.start();
+ // start the snapshot handler and other procedure handlers,
+ // since the server is ready to run
+ rspmHost.start();
}
// We registered with the Master. Go into run mode.
@@ -945,12 +948,8 @@ public class HRegionServer implements Cl
this.nonceManagerChore.interrupt();
}
- // Stop the snapshot handler, forcefully killing all running tasks
- try {
- if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
- } catch (IOException e) {
- LOG.warn("Failed to close snapshot handler cleanly", e);
- }
+ // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
+ rspmHost.stop(this.abortRequested || this.killed);
if (this.killed) {
// Just skip out w/o closing regions. Used when testing.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java Wed Jan 22 01:42:49 2014
@@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.master.sn
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtoc
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class RegionServerSnapshotManager {
+public class RegionServerSnapshotManager extends RegionServerProcedureManager {
private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
/** Maximum number of snapshot region tasks that can run concurrently */
@@ -93,9 +93,9 @@ public class RegionServerSnapshotManager
/** Default amount of time to check for errors while regions finish snapshotting */
private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
- private final RegionServerServices rss;
- private final ProcedureMemberRpcs memberRpcs;
- private final ProcedureMember member;
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
/**
* Exposed for testing.
@@ -111,32 +111,12 @@ public class RegionServerSnapshotManager
this.member = procMember;
}
- /**
- * Create a default snapshot handler - uses a zookeeper based member controller.
- * @param rss region server running the handler
- * @throws KeeperException if the zookeeper cluster cannot be reached
- */
- public RegionServerSnapshotManager(RegionServerServices rss)
- throws KeeperException {
- this.rss = rss;
- ZooKeeperWatcher zkw = rss.getZooKeeper();
- this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
- SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
-
- // read in the snapshot request configuration properties
- Configuration conf = rss.getConfiguration();
- long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
- int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
-
- // create the actual snapshot procedure member
- ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
- opThreads, keepAlive);
- this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
- }
+ public RegionServerSnapshotManager() {}
/**
* Start accepting snapshot requests.
*/
+ @Override
public void start() {
LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
this.memberRpcs.start(rss.getServerName().toString(), member);
@@ -147,6 +127,7 @@ public class RegionServerSnapshotManager
* @param force forcefully stop all running tasks
* @throws IOException
*/
+ @Override
public void stop(boolean force) throws IOException {
String mode = force ? "abruptly" : "gracefully";
LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
@@ -373,4 +354,33 @@ public class RegionServerSnapshotManager
this.executor.shutdownNow();
}
}
+
+ /**
+ * Create a default snapshot handler - uses a zookeeper based member controller.
+ * @param rss region server running the handler
+ * @throws KeeperException if the zookeeper cluster cannot be reached
+ */
+ @Override
+ public void initialize(RegionServerServices rss) throws KeeperException {
+ this.rss = rss;
+ ZooKeeperWatcher zkw = rss.getZooKeeper();
+ this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
+ SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
+
+ // read in the snapshot request configuration properties
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
+
+ // create the actual snapshot procedure member
+ ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
+ opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
+ }
+
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
+import org.apache.zookeeper.KeeperException;
+
+public class SimpleMasterProcedureManager extends MasterProcedureManager {
+
+ public static final String SIMPLE_SIGNATURE = "simle_test";
+
+ private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
+
+ private MasterServices master;
+ private ProcedureCoordinator coordinator;
+ private ExecutorService executorService;
+
+ private boolean done;
+
+ @Override
+ public void stop(String why) {
+ LOG.info("stop: " + why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void initialize(MasterServices master, MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException {
+ this.master = master;
+ this.done = false;
+
+ // setup the default procedure coordinator
+ String name = master.getServerName().toString();
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+ ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
+ master.getZooKeeper(), getProcedureSignature(), name);
+
+ this.coordinator = new ProcedureCoordinator(comms, tpool);
+ this.executorService = master.getExecutorService();
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return SIMPLE_SIGNATURE;
+ }
+
+ @Override
+ public void execProcedure(ProcedureDescription desc) throws IOException {
+ this.done = false;
+ // start the process on the RS
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+
+ List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+ List<String> servers = new ArrayList<String>();
+ for (ServerName sn : serverNames) {
+ servers.add(sn.toString());
+ }
+ Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
+ if (proc == null) {
+ String msg = "Failed to submit distributed procedure for '"
+ + getProcedureSignature() + "'";
+ LOG.error(msg);
+ throw new HBaseSnapshotException(msg);
+ }
+
+ try {
+ // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+ // if it takes too long.
+ proc.waitForCompleted();
+ LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+ this.done = true;
+ } catch (InterruptedException e) {
+ ForeignException ee =
+ new ForeignException("Interrupted while waiting for procdure to finish", e);
+ monitor.receive(ee);
+ Thread.currentThread().interrupt();
+ } catch (ForeignException e) {
+ monitor.receive(e);
+ }
+ }
+
+ @Override
+ public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+ return done;
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class SimpleRSProcedureManager extends RegionServerProcedureManager {
+
+ private static final Log LOG = LogFactory.getLog(SimpleRSProcedureManager.class);
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+
+ @Override
+ public void initialize(RegionServerServices rss) throws KeeperException {
+ this.rss = rss;
+ ZooKeeperWatcher zkw = rss.getZooKeeper();
+ this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
+
+ ThreadPoolExecutor pool =
+ ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
+ this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
+ LOG.info("Initialized: " + rss.getServerName().toString());
+ }
+
+ @Override
+ public void start() {
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ LOG.info("Started.");
+ }
+
+ @Override
+ public void stop(boolean force) throws IOException {
+ LOG.info("stop: " + force);
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure for handling a procedure.
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure(String name) {
+
+ // don't run a procedure if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
+ + ", because stopping/stopped!");
+ }
+
+ LOG.info("Attempting to run a procedure.");
+ ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+ Configuration conf = rss.getConfiguration();
+
+ SimpleSubprocedurePool taskManager =
+ new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
+ return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
+ }
+
+ /**
+ * Build the actual procedure runner that will do all the 'hard' work
+ */
+ public class SimleSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ LOG.info("Building procedure: " + name);
+ return SimpleRSProcedureManager.this.buildSubprocedure(name);
+ }
+ }
+
+ public class SimpleSubprocedurePool implements Closeable, Abortable {
+
+ private final ExecutorCompletionService<Void> taskPool;
+ private final ThreadPoolExecutor executor;
+ private volatile boolean aborted;
+ private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ private final String name;
+
+ public SimpleSubprocedurePool(String name, Configuration conf) {
+ this.name = name;
+ executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
+ taskPool = new ExecutorCompletionService<Void>(executor);
+ }
+
+ /**
+ * Submit a task to the pool.
+ */
+ public void submitTask(final Callable<Void> task) {
+ Future<Void> f = this.taskPool.submit(task);
+ futures.add(f);
+ }
+
+ /**
+ * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+ *
+ * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+ * @throws ForeignException
+ */
+ public boolean waitForOutstandingTasks() throws ForeignException {
+ LOG.debug("Waiting for procedure to finish.");
+
+ try {
+ for (Future<Void> f: futures) {
+ f.get();
+ }
+ return true;
+ } catch (InterruptedException e) {
+ if (aborted) throw new ForeignException(
+ "Interrupted and found to be aborted while waiting for tasks!", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ForeignException) {
+ throw (ForeignException) e.getCause();
+ }
+ throw new ForeignException(name, e.getCause());
+ } finally {
+ // close off remaining tasks
+ for (Future<Void> f: futures) {
+ if (!f.isDone()) {
+ f.cancel(true);
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+ * finish
+ */
+ @Override
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (this.aborted) return;
+
+ this.aborted = true;
+ LOG.warn("Aborting because: " + why, e);
+ this.executor.shutdownNow();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+ }
+
+ public class SimpleSubprocedure extends Subprocedure {
+ private final RegionServerServices rss;
+ private final SimpleSubprocedurePool taskManager;
+
+ public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
+ ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
+ super(member, name, errorListener, 500, 60000);
+ LOG.info("Constructing a SimpleSubprocedure.");
+ this.rss = rss;
+ this.taskManager = taskManager;
+ }
+
+ /**
+ * Callable task.
+ * TODO. We don't need a thread pool to execute roll log. This can be simplified
+ * with no use of subprocedurepool.
+ */
+ class RSSimpleTask implements Callable<Void> {
+ RSSimpleTask() {}
+
+ @Override
+ public Void call() throws Exception {
+ LOG.info("Execute subprocedure on " + rss.getServerName().toString());
+ return null;
+ }
+
+ }
+
+ private void execute() throws ForeignException {
+
+ monitor.rethrowException();
+
+ // running a task (e.g., roll log, flush table) on region server
+ taskManager.submitTask(new RSSimpleTask());
+ monitor.rethrowException();
+
+ // wait for everything to complete.
+ taskManager.waitForOutstandingTasks();
+ monitor.rethrowException();
+
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ // do nothing, executing in inside barrier step.
+ }
+
+ /**
+ * do a log roll.
+ */
+ @Override
+ public void insideBarrier() throws ForeignException {
+ execute();
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ taskManager.abort("Aborting simple subprocedure tasks due to error", e);
+ }
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java?rev=1560234&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureManager.java Wed Jan 22 01:42:49 2014
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestProcedureManager {
+
+ static final Log LOG = LogFactory.getLog(TestProcedureManager.class);
+ private static final int NUM_RS = 2;
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ // set configure to indicate which pm should be loaded
+ Configuration conf = util.getConfiguration();
+
+ conf.set(ProcedureManagerHost.MASTER_PROCEUDRE_CONF_KEY,
+ SimpleMasterProcedureManager.class.getName());
+ conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
+ SimpleRSProcedureManager.class.getName());
+
+ util.startMiniCluster(NUM_RS);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSimpleProcedureManager() throws IOException {
+ HBaseAdmin admin = util.getHBaseAdmin();
+
+ admin.execProcedure(SimpleMasterProcedureManager.SIMPLE_SIGNATURE,
+ "mytest", new HashMap<String, String>());
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java?rev=1560234&r1=1560233&r2=1560234&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java Wed Jan 22 01:42:49 2014
@@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -185,6 +187,60 @@ public class TestFlushSnapshotFromClient
admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
}
+
+ /**
+ * Test simple flush snapshotting a table that is online
+ * @throws Exception
+ */
+ @Test (timeout=300000)
+ public void testFlushTableSnapshotWithProcedure() throws Exception {
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ // make sure we don't fail on listing snapshots
+ SnapshotTestingUtils.assertNoSnapshots(admin);
+
+ // put some stuff in the table
+ HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
+ SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
+
+ // get the name of all the regionservers hosting the snapshotted table
+ Set<String> snapshotServers = new HashSet<String>();
+ List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
+ for (RegionServerThread server : servers) {
+ if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
+ snapshotServers.add(server.getRegionServer().getServerName().toString());
+ }
+ }
+
+ LOG.debug("FS state before snapshot:");
+ FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
+ FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
+
+ // take a snapshot of the enabled table
+ String snapshotString = "offlineTableSnapshot";
+ byte[] snapshot = Bytes.toBytes(snapshotString);
+ Map<String, String> props = new HashMap<String, String>();
+ props.put("table", STRING_TABLE_NAME);
+ admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION,
+ snapshotString, props);
+
+
+ LOG.debug("Snapshot completed.");
+
+ // make sure we have the snapshot
+ List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
+ snapshot, TABLE_NAME);
+
+ // make sure its a valid snapshot
+ FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ LOG.debug("FS state after snapshot:");
+ FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
+ FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
+
+ SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
+ admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
+ }
+
@Test (timeout=300000)
public void testSnapshotFailsOnNonExistantTable() throws Exception {
HBaseAdmin admin = UTIL.getHBaseAdmin();