You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/01/04 17:56:54 UTC
[09/29] hbase git commit: HBASE-14030 Revert due to pending review
comments
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
deleted file mode 100644
index ae21b33..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-/**
- * The main class which interprets the given arguments and trigger restore operation.
- */
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public final class RestoreClient {
-
- private static final Log LOG = LogFactory.getLog(RestoreClient.class);
-
- private static Options opt;
- private static Configuration conf;
- private static Set<BackupImage> lastRestoreImagesSet;
-
- // delimiter in tablename list in restore command
- private static final String DELIMITER_IN_COMMAND = ",";
-
- private static final String OPTION_OVERWRITE = "overwrite";
- private static final String OPTION_CHECK = "check";
- private static final String OPTION_AUTOMATIC = "automatic";
-
- private static final String USAGE =
- "Usage: hbase restore <backup_root_path> <backup_id> <tables> [tableMapping] \n"
- + " [-overwrite] [-check] [-automatic]\n"
- + " backup_root_path The parent location where the backup images are stored\n"
- + " backup_id The id identifying the backup image\n"
- + " table(s) Table(s) from the backup image to be restored.\n"
- + " Tables are separated by comma.\n"
- + " Options:\n"
- + " tableMapping A comma separated list of target tables.\n"
- + " If specified, each table in <tables> must have a mapping.\n"
- + " -overwrite With this option, restore overwrites to the existing table "
- + "if there's any in\n"
- + " restore target. The existing table must be online before restore.\n"
- + " -check With this option, restore sequence and dependencies are checked\n"
- + " and verified without executing the restore\n"
- + " -automatic With this option, all the dependencies are automatically restored\n"
- + " together with this backup image following the correct order.\n"
- + " The restore dependencies can be checked by using \"-check\" "
- + "option,\n"
- + " or using \"hbase backup describe\" command. Without this option, "
- + "only\n" + " this backup image is restored\n";
-
- private RestoreClient(){
- throw new AssertionError("Instantiating utility class...");
- }
-
- protected static void init() throws IOException {
- // define supported options
- opt = new Options();
- opt.addOption(OPTION_OVERWRITE, false,
- "Overwrite the data if any of the restore target tables exists");
- opt.addOption(OPTION_CHECK, false, "Check restore sequence and dependencies");
- opt.addOption(OPTION_AUTOMATIC, false, "Restore all dependencies");
- opt.addOption("debug", false, "Enable debug logging");
-
- conf = getConf();
-
- // disable irrelevant loggers to avoid it mess up command output
- disableUselessLoggers();
- }
-
- public static void main(String[] args) throws IOException {
- init();
- parseAndRun(args);
- }
-
- private static void parseAndRun(String[] args) {
- CommandLine cmd = null;
- try {
- cmd = new PosixParser().parse(opt, args);
- } catch (ParseException e) {
- LOG.error("Could not parse command", e);
- System.exit(-1);
- }
-
- // enable debug logging
- Logger backupClientLogger = Logger.getLogger("org.apache.hadoop.hbase.backup");
- if (cmd.hasOption("debug")) {
- backupClientLogger.setLevel(Level.DEBUG);
- }
-
- // whether to overwrite to existing table if any, false by default
- boolean isOverwrite = cmd.hasOption(OPTION_OVERWRITE);
- if (isOverwrite) {
- LOG.debug("Found -overwrite option in restore command, "
- + "will overwrite to existing table if any in the restore target");
- }
-
- // whether to only check the dependencies, false by default
- boolean check = cmd.hasOption(OPTION_CHECK);
- if (check) {
- LOG.debug("Found -check option in restore command, "
- + "will check and verify the dependencies");
- }
-
- // whether to restore all dependencies, false by default
- boolean autoRestore = cmd.hasOption(OPTION_AUTOMATIC);
- if (autoRestore) {
- LOG.debug("Found -automatic option in restore command, "
- + "will automatically retore all the dependencies");
- }
-
- // parse main restore command options
- String[] remainArgs = cmd.getArgs();
- if (remainArgs.length < 3) {
- System.out.println("ERROR: missing arguments");
- System.out.println(USAGE);
- System.exit(-1);
- }
-
- String backupRootDir = remainArgs[0];
- String backupId = remainArgs[1];
- String tables = remainArgs[2];
-
- String tableMapping = (remainArgs.length > 3) ? remainArgs[3] : null;
-
- String[] sTableArray = (tables != null) ? tables.split(DELIMITER_IN_COMMAND) : null;
- String[] tTableArray = (tableMapping != null) ? tableMapping.split(DELIMITER_IN_COMMAND) : null;
-
- if (tableMapping != null && tTableArray != null && (sTableArray.length != tTableArray.length)) {
- System.err.println("ERROR: table mapping mismatch: " + tables + " : " + tableMapping);
- System.out.println(USAGE);
- System.exit(-1);
- }
-
- try {
- HBackupFileSystem hBackupFS = new HBackupFileSystem(conf, new Path(backupRootDir), backupId);
- restore_stage1(hBackupFS, backupRootDir, backupId, check, autoRestore, sTableArray,
- tTableArray, isOverwrite);
- } catch (IOException e) {
- System.err.println("ERROR: " + e.getMessage());
- System.exit(-1);
- }
- }
-
- /**
- * Restore operation. Stage 1: validate backupManifest, and check target tables
- * @param hBackupFS to access the backup image
- * @param backupRootDir The root dir for backup image
- * @param backupId The backup id for image to be restored
- * @param check True if only do dependency check
- * @param autoRestore True if automatically restore following the dependency
- * @param sTableArray The array of tables to be restored
- * @param tTableArray The array of mapping tables to restore to
- * @param isOverwrite True then do restore overwrite if target table exists, otherwise fail the
- * request if target table exists
- * @return True if only do dependency check
- * @throws IOException if any failure during restore
- */
- public static boolean restore_stage1(HBackupFileSystem hBackupFS, String backupRootDir,
- String backupId, boolean check, boolean autoRestore, String[] sTableArray,
- String[] tTableArray, boolean isOverwrite) throws IOException {
-
- HashMap<String, BackupManifest> backupManifestMap = new HashMap<String, BackupManifest>();
- // check and load backup image manifest for the tables
- hBackupFS.checkImageManifestExist(backupManifestMap, sTableArray);
-
- try {
- // Check and validate the backup image and its dependencies
- if (check || autoRestore) {
- if (validate(backupManifestMap)) {
- LOG.info("Checking backup images: ok");
- } else {
- String errMsg = "Some dependencies are missing for restore";
- LOG.error(errMsg);
- throw new IOException(errMsg);
- }
- }
-
- // return true if only for check
- if (check) {
- return true;
- }
-
- if (tTableArray == null) {
- tTableArray = sTableArray;
- }
-
- // check the target tables
- checkTargetTables(tTableArray, isOverwrite);
-
- // start restore process
- Set<BackupImage> restoreImageSet =
- restore_stage2(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore);
-
- LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!");
- lastRestoreImagesSet = restoreImageSet;
-
- } catch (IOException e) {
- LOG.error("ERROR: restore failed with error: " + e.getMessage());
- throw e;
- }
-
- // not only for check, return false
- return false;
- }
-
- /**
- * Get last restore image set. The value is globally set for the latest finished restore.
- * @return the last restore image set
- */
- public static Set<BackupImage> getLastRestoreImagesSet() {
- return lastRestoreImagesSet;
- }
-
- private static boolean validate(HashMap<String, BackupManifest> backupManifestMap)
- throws IOException {
- boolean isValid = true;
-
- for (Entry<String, BackupManifest> manifestEntry : backupManifestMap.entrySet()) {
-
- String table = manifestEntry.getKey();
- TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>();
-
- ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table);
- if (depList != null && !depList.isEmpty()) {
- imageSet.addAll(depList);
- }
-
- // todo merge
- LOG.debug("merge will be implemented in future jira");
- // BackupUtil.clearMergedImages(table, imageSet, conf);
-
- LOG.info("Dependent image(s) from old to new:");
- for (BackupImage image : imageSet) {
- String imageDir =
- HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table);
- if (!HBackupFileSystem.checkPathExist(imageDir, getConf())) {
- LOG.error("ERROR: backup image does not exist: " + imageDir);
- isValid = false;
- break;
- }
- // TODO More validation?
- LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available");
- }
- }
- return isValid;
- }
-
- /**
- * Validate target Tables
- * @param tTableArray: target tables
- * @param isOverwrite overwrite existing table
- * @throws IOException exception
- */
- private static void checkTargetTables(String[] tTableArray, boolean isOverwrite)
- throws IOException {
- ArrayList<String> existTableList = new ArrayList<String>();
- ArrayList<String> disabledTableList = new ArrayList<String>();
-
- // check if the tables already exist
- HBaseAdmin admin = null;
- Connection conn = null;
- try {
- conn = ConnectionFactory.createConnection(conf);
- admin = (HBaseAdmin) conn.getAdmin();
- for (String tableName : tTableArray) {
- if (admin.tableExists(TableName.valueOf(tableName))) {
- existTableList.add(tableName);
- if (admin.isTableDisabled(TableName.valueOf(tableName))) {
- disabledTableList.add(tableName);
- }
- } else {
- LOG.info("HBase table " + tableName
- + " does not exist. It will be create during backup process");
- }
- }
- } finally {
- if (admin != null) {
- admin.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- if (existTableList.size() > 0) {
- if (!isOverwrite) {
- LOG.error("Existing table found in the restore target, please add \"-overwrite\" "
- + "option in the command if you mean to restore to these existing tables");
- LOG.info("Existing table list in restore target: " + existTableList);
- throw new IOException("Existing table found in target while no \"-overwrite\" "
- + "option found");
- } else {
- if (disabledTableList.size() > 0) {
- LOG.error("Found offline table in the restore target, "
- + "please enable them before restore with \"-overwrite\" option");
- LOG.info("Offline table list in restore target: " + disabledTableList);
- throw new IOException(
- "Found offline table in the target when restore with \"-overwrite\" option");
- }
- }
- }
-
- }
-
- /**
- * Restore operation. Stage 2: resolved Backup Image dependency
- * @param hBackupFS to access the backup image
- * @param backupManifestMap : tableName, Manifest
- * @param sTableArray The array of tables to be restored
- * @param tTableArray The array of mapping tables to restore to
- * @param autoRestore : yes, restore all the backup images on the dependency list
- * @return set of BackupImages restored
- * @throws IOException exception
- */
- private static Set<BackupImage> restore_stage2(HBackupFileSystem hBackupFS,
- HashMap<String, BackupManifest> backupManifestMap, String[] sTableArray,
- String[] tTableArray, boolean autoRestore) throws IOException {
- TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
-
- for (int i = 0; i < sTableArray.length; i++) {
- restoreImageSet.clear();
- String table = sTableArray[i];
- BackupManifest manifest = backupManifestMap.get(table);
- if (autoRestore) {
- // Get the image list of this backup for restore in time order from old
- // to new.
- TreeSet<BackupImage> restoreList =
- new TreeSet<BackupImage>(manifest.getDependentListByTable(table));
- LOG.debug("need to clear merged Image. to be implemented in future jira");
-
- for (BackupImage image : restoreList) {
- restoreImage(image, table, tTableArray[i]);
- }
- restoreImageSet.addAll(restoreList);
- } else {
- BackupImage image = manifest.getBackupImage();
- List<BackupImage> depList = manifest.getDependentListByTable(table);
- // The dependency list always contains self.
- if (depList != null && depList.size() > 1) {
- LOG.warn("Backup image " + image.getBackupId() + " depends on other images.\n"
- + "this operation will only restore the delta contained within backupImage "
- + image.getBackupId());
- }
- restoreImage(image, table, tTableArray[i]);
- restoreImageSet.add(image);
- }
-
- if (autoRestore) {
- if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
- LOG.info("Restore includes the following image(s):");
- for (BackupImage image : restoreImageSet) {
- LOG.info(" Backup: "
- + image.getBackupId()
- + " "
- + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
- table));
- }
- }
- }
-
- }
- return restoreImageSet;
- }
-
- /**
- * Restore operation handle each backupImage
- * @param image: backupImage
- * @param sTable: table to be restored
- * @param tTable: table to be restored to
- * @throws IOException exception
- */
- private static void restoreImage(BackupImage image, String sTable, String tTable)
- throws IOException {
-
- Configuration conf = getConf();
-
- String rootDir = image.getRootDir();
- LOG.debug("Image root dir " + rootDir);
- String backupId = image.getBackupId();
-
- HBackupFileSystem hFS = new HBackupFileSystem(conf, new Path(rootDir), backupId);
- RestoreUtil restoreTool = new RestoreUtil(conf, hFS);
- BackupManifest manifest = hFS.getManifest(sTable);
-
- Path tableBackupPath = hFS.getTableBackupPath(sTable);
-
- // todo: convert feature will be provided in a future jira
- boolean converted = false;
-
- if (manifest.getType().equals(BackupRestoreConstants.BACKUP_TYPE_FULL) || converted) {
- LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
- + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
- restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, converted);
- } else { // incremental Backup
- String logBackupDir =
- HBackupFileSystem.getLogBackupDir(image.getRootDir(), image.getBackupId());
- LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from incremental backup image "
- + logBackupDir);
- restoreTool.incrementalRestoreTable(logBackupDir, new String[] { sTable },
- new String[] { tTable });
- }
-
- LOG.info(sTable + " has been successfully restored to " + tTable);
- }
-
- /**
- * Set the configuration from a given one.
- * @param newConf A new given configuration
- */
- public synchronized static void setConf(Configuration newConf) {
- conf = newConf;
- }
-
- /**
- * Get and merge Hadoop and HBase configuration.
- * @throws IOException exception
- */
- protected static Configuration getConf() {
- if (conf == null) {
- synchronized (RestoreClient.class) {
- conf = new Configuration();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create());
- }
- }
- return conf;
- }
-
- private static void disableUselessLoggers() {
- // disable zookeeper log to avoid it mess up command output
- Logger zkLogger = Logger.getLogger("org.apache.zookeeper");
- LOG.debug("Zookeeper log level before set: " + zkLogger.getLevel());
- zkLogger.setLevel(Level.OFF);
- LOG.debug("Zookeeper log level after set: " + zkLogger.getLevel());
-
- // disable hbase zookeeper tool log to avoid it mess up command output
- Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper");
- LOG.debug("HBase zookeeper log level before set: " + hbaseZkLogger.getLevel());
- hbaseZkLogger.setLevel(Level.OFF);
- LOG.debug("HBase Zookeeper log level after set: " + hbaseZkLogger.getLevel());
-
- // disable hbase client log to avoid it mess up command output
- Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client");
- LOG.debug("HBase client log level before set: " + hbaseClientLogger.getLevel());
- hbaseClientLogger.setLevel(Level.OFF);
- LOG.debug("HBase client log level after set: " + hbaseClientLogger.getLevel());
-
- // disable other related log to avoid mess up command output
- Logger otherLogger = Logger.getLogger("org.apache.hadoop.hbase.io.hfile");
- otherLogger.setLevel(Level.OFF);
- otherLogger = Logger.getLogger("org.apache.hadoop.hbase.util");
- otherLogger.setLevel(Level.OFF);
- otherLogger = Logger.getLogger("org.apache.hadoop.hbase.mapreduce");
- otherLogger.setLevel(Level.OFF);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
deleted file mode 100644
index bdb7988..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java
+++ /dev/null
@@ -1,503 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.NavigableSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-
-/**
- * A collection for methods used by multiple classes to restore HBase tables.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RestoreUtil {
-
- public static final Log LOG = LogFactory.getLog(RestoreUtil.class);
-
- protected Configuration conf = null;
-
- protected HBackupFileSystem hBackupFS = null;
-
- // store table name and snapshot dir mapping
- private final HashMap<String, Path> snapshotMap = new HashMap<String, Path>();
-
- public RestoreUtil(Configuration conf, HBackupFileSystem hBackupFS) throws IOException {
- this.conf = conf;
- this.hBackupFS = hBackupFS;
- }
-
- /**
- * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
- * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
- * the future
- * @param logDir : incremental backup folders, which contains WAL
- * @param tableNames : source tableNames(table names were backuped)
- * @param newTableNames : target tableNames(table names to be restored to)
- * @throws IOException exception
- */
- public void incrementalRestoreTable(String logDir, String[] tableNames, String[] newTableNames)
- throws IOException {
-
- if (tableNames.length != newTableNames.length) {
- throw new IOException("Number of source tables adn taget Tables does not match!");
- }
-
- // for incremental backup image, expect the table already created either by user or previous
- // full backup. Here, check that all new tables exists
- HBaseAdmin admin = null;
- Connection conn = null;
- try {
- conn = ConnectionFactory.createConnection(conf);
- admin = (HBaseAdmin) conn.getAdmin();
- for (String tableName : newTableNames) {
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- admin.close();
- throw new IOException("HBase table " + tableName
- + " does not exist. Create the table first, e.g. by restoring a full backup.");
- }
- }
- IncrementalRestoreService restoreService =
- BackupRestoreServiceFactory.getIncrementalRestoreService(conf);
-
- restoreService.run(logDir, tableNames, newTableNames);
- } finally {
- if (admin != null) {
- admin.close();
- }
- if(conn != null){
- conn.close();
- }
- }
- }
-
- public void fullRestoreTable(Path tableBackupPath, String tableName, String newTableName,
- boolean converted) throws IOException {
-
- restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted);
- }
-
- private void restoreTableAndCreate(String tableName, String newTableName, Path tableBackupPath,
- boolean converted) throws IOException {
- if (newTableName == null || newTableName.equals("")) {
- newTableName = tableName;
- }
-
- FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
-
- // get table descriptor first
- HTableDescriptor tableDescriptor = null;
-
- Path tableSnapshotPath = hBackupFS.getTableSnapshotPath(tableName);
-
- if (fileSys.exists(tableSnapshotPath)) {
- // snapshot path exist means the backup path is in HDFS
- // check whether snapshot dir already recorded for target table
- if (snapshotMap.get(tableName) != null) {
- SnapshotDescription desc =
- SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
- SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
- tableDescriptor = manifest.getTableDescriptor();
- LOG.debug("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString()
- + " while tableName = " + tableName);
- // HBase 96.0 and 98.0
- // tableDescriptor =
- // FSTableDescriptors.getTableDescriptorFromFs(fileSys, snapshotMap.get(tableName));
- } else {
- tableDescriptor = hBackupFS.getTableDesc(tableName);
- LOG.debug("tableSnapshotPath=" + tableSnapshotPath.toString());
- snapshotMap.put(tableName, hBackupFS.getTableInfoPath(tableName));
- }
- if (tableDescriptor == null) {
- LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
- }
- } else if (converted) {
- // first check if this is a converted backup image
- LOG.error("convert will be supported in a future jira");
- }
-
- Path tableArchivePath = hBackupFS.getTableArchivePath(tableName);
- if (tableArchivePath == null) {
- if (tableDescriptor != null) {
- // find table descriptor but no archive dir means the table is empty, create table and exit
- LOG.debug("find table descriptor but no archive dir for table " + tableName
- + ", will only create table");
- tableDescriptor.setName(Bytes.toBytes(newTableName));
- checkAndCreateTable(tableBackupPath, tableName, newTableName, null, tableDescriptor);
- return;
- } else {
- throw new IllegalStateException("Cannot restore hbase table because directory '"
- + " tableArchivePath is null.");
- }
- }
-
- if (tableDescriptor == null) {
- tableDescriptor = new HTableDescriptor(newTableName);
- } else {
- tableDescriptor.setName(Bytes.toBytes(newTableName));
- }
-
- if (!converted) {
- // record all region dirs:
- // load all files in dir
- try {
- ArrayList<Path> regionPathList = hBackupFS.getRegionList(tableName);
-
- // should only try to create the table with all region informations, so we could pre-split
- // the regions in fine grain
- checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList,
- tableDescriptor);
- if (tableArchivePath != null) {
- // start real restore through bulkload
- // if the backup target is on local cluster, special action needed
- Path tempTableArchivePath = hBackupFS.checkLocalAndBackup(tableArchivePath);
- if (tempTableArchivePath.equals(tableArchivePath)) {
- LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
- } else {
- regionPathList = hBackupFS.getRegionList(tempTableArchivePath); // point to the tempDir
- LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
- }
-
- LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
- for (Path regionPath : regionPathList) {
- String regionName = regionPath.toString();
- LOG.debug("Restoring HFiles from directory " + regionName);
- String[] args = { regionName, newTableName };
- loader.run(args);
- }
- }
- // restore the recovered.edits if exists
- replayRecoveredEditsIfAny(tableBackupPath, tableName, tableDescriptor);
- } catch (Exception e) {
- throw new IllegalStateException("Cannot restore hbase table", e);
- }
- } else {
- LOG.debug("convert will be supported in a future jira");
- }
- }
-
- /**
- * Replay recovered edits from backup.
- */
- private void replayRecoveredEditsIfAny(Path tableBackupPath, String tableName,
- HTableDescriptor newTableHtd) throws IOException {
-
- LOG.debug("Trying to replay the recovered.edits if exist to the target table "
- + newTableHtd.getNameAsString() + " from the backup of table " + tableName + ".");
-
- FileSystem fs = tableBackupPath.getFileSystem(this.conf);
- ArrayList<Path> regionDirs = hBackupFS.getRegionList(tableName);
-
- if (regionDirs == null || regionDirs.size() == 0) {
- LOG.warn("No recovered.edits to be replayed for empty backup of table " + tableName + ".");
- return;
- }
-
- Connection conn = null;
- try {
-
- conn = ConnectionFactory.createConnection(conf);
-
- for (Path regionDir : regionDirs) {
- // OLD: NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regionDir);
- NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir);
-
- if (files == null || files.isEmpty()) {
- LOG.warn("No recovered.edits found for the region " + regionDir.getName() + ".");
- return;
- }
-
- for (Path edits : files) {
- if (edits == null || !fs.exists(edits)) {
- LOG.warn("Null or non-existent edits file: " + edits);
- continue;
- }
-
- HTable table = null;
- try {
- table = (HTable) conn.getTable(newTableHtd.getTableName());
- replayRecoveredEdits(table, fs, edits);
- table.flushCommits();
- table.close();
- } catch (IOException e) {
- boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
- if (skipErrors) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
- LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
- + "=true so continuing. Renamed " + edits + " as " + p, e);
- } else {
- throw e;
- }
- } finally {
- if (table != null) {
- table.close();
- }
- }
- } // for each edit file under a region
- } // for each region
-
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- }
-
- /**
- * Restore process for an edit entry.
- * @param htable The target table of restore
- * @param key HLog key
- * @param val KVs
- * @throws IOException exception
- */
- private void restoreEdit(HTable htable, WALKey key, WALEdit val) throws IOException {
- Put put = null;
- Delete del = null;
- Cell lastKV = null;
- for (Cell kv : val.getCells()) {
- // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
- if (WALEdit.isMetaEditFamily(CellUtil.cloneFamily(kv))) {
- continue;
- }
-
- // A WALEdit may contain multiple operations (HBASE-3584) and/or
- // multiple rows (HBASE-5229).
- // Aggregate as much as possible into a single Put/Delete
- // operation before apply the action to the table.
- if (lastKV == null || lastKV.getTypeByte() != kv.getTypeByte()
- || !CellUtil.matchingRow(lastKV, kv)) {
- // row or type changed, write out aggregate KVs.
- if (put != null) {
- applyAction(htable, put);
- }
- if (del != null) {
- applyAction(htable, del);
- }
-
- if (CellUtil.isDelete(kv)) {
- del = new Delete(CellUtil.cloneRow(kv));
- } else {
- put = new Put(CellUtil.cloneRow(kv));
- }
- }
- if (CellUtil.isDelete(kv)) {
- del.addDeleteMarker(kv);
- } else {
- put.add(kv);
- }
- lastKV = kv;
- }
- // write residual KVs
- if (put != null) {
- applyAction(htable, put);
- }
- if (del != null) {
- applyAction(htable, del);
- }
- }
-
- /**
- * Apply an action (Put/Delete) to table.
- * @param table table
- * @param action action
- * @throws IOException exception
- */
- private void applyAction(HTable table, Mutation action) throws IOException {
- // The actions are not immutable, so we defensively copy them
- if (action instanceof Put) {
- Put put = new Put((Put) action);
- // put.setWriteToWAL(false);
- // why do not we do WAL?
- put.setDurability(Durability.SKIP_WAL);
- table.put(put);
- } else if (action instanceof Delete) {
- Delete delete = new Delete((Delete) action);
- table.delete(delete);
- } else {
- throw new IllegalArgumentException("action must be either Delete or Put");
- }
- }
-
- /**
- * Replay the given edits.
- * @param htable The target table of restore
- * @param fs File system
- * @param edits Recovered.edits to be replayed
- * @throws IOException exception
- */
- private void replayRecoveredEdits(HTable htable, FileSystem fs, Path edits) throws IOException {
- LOG.debug("Replaying edits from " + edits + "; path=" + edits);
-
- WAL.Reader reader = null;
- try {
- reader = WALFactory.createReader(fs, edits, this.conf);
- long editsCount = 0;
- WAL.Entry entry;
-
- try {
- while ((entry = reader.next()) != null) {
- restoreEdit(htable, entry.getKey(), entry.getEdit());
- editsCount++;
- }
- LOG.debug(editsCount + " edits from " + edits + " have been replayed.");
-
- } catch (EOFException eof) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
- String msg =
- "Encountered EOF. Most likely due to Master failure during "
- + "log spliting, so we have this data in another edit. "
- + "Continuing, but renaming " + edits + " as " + p;
- LOG.warn(msg, eof);
- } catch (IOException ioe) {
- // If the IOE resulted from bad file format,
- // then this problem is idempotent and retrying won't help
- if (ioe.getCause() instanceof ParseException) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
- String msg =
- "File corruption encountered! " + "Continuing, but renaming " + edits + " as " + p;
- LOG.warn(msg, ioe);
- } else {
- // other IO errors may be transient (bad network connection,
- // checksum exception on one datanode, etc). throw & retry
- throw ioe;
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
-
- /**
- * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
- * backup.
- * @return the {@link LoadIncrementalHFiles} instance
- * @throws IOException exception
- */
- private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
- throws IOException {
- // set configuration for restore:
- // LoadIncrementalHFile needs more time
- // <name>hbase.rpc.timeout</name> <value>600000</value>
- // calculates
- Integer milliSecInMin = 60000;
- Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0);
- Integer numberOfFilesInDir =
- multipleTables ? hBackupFS.getMaxNumberOfFilesInSubDir(tableArchivePath) : hBackupFS
- .getNumberOfFilesInDir(tableArchivePath);
- Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file
- Integer resultMillis = Math.max(calculatedMillis, previousMillis);
- if (resultMillis > previousMillis) {
- LOG.info("Setting configuration for restore with LoadIncrementalHFile: "
- + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin
- + " minutes, to handle the number of files in backup " + tableArchivePath);
- this.conf.setInt("hbase.rpc.timeout", resultMillis);
- }
-
- LoadIncrementalHFiles loader = null;
- try {
- loader = new LoadIncrementalHFiles(this.conf);
- } catch (Exception e1) {
- throw new IOException(e1);
- }
- return loader;
- }
-
- /**
- * Prepare the table for bulkload, most codes copied from
- * {@link LoadIncrementalHFiles#createTable(String, String)}
- * @param tableBackupPath path
- * @param tableName table name
- * @param targetTableName target table name
- * @param regionDirList region directory list
- * @param htd table descriptor
- * @throws IOException exception
- */
- private void checkAndCreateTable(Path tableBackupPath, String tableName, String targetTableName,
- ArrayList<Path> regionDirList, HTableDescriptor htd) throws IOException {
- HBaseAdmin hbadmin = null;
- Connection conn = null;
- try {
- conn = ConnectionFactory.createConnection(conf);
- hbadmin = (HBaseAdmin) conn.getAdmin();
- if (hbadmin.tableExists(TableName.valueOf(targetTableName))) {
- LOG.info("Using exising target table '" + targetTableName + "'");
- } else {
- LOG.info("Creating target table '" + targetTableName + "'");
-
- // if no region dir given, create the table and return
- if (regionDirList == null || regionDirList.size() == 0) {
-
- hbadmin.createTable(htd);
- return;
- }
-
- byte[][] keys = hBackupFS.generateBoundaryKeys(regionDirList);
-
- // create table using table decriptor and region boundaries
- hbadmin.createTable(htd, keys);
- }
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- if (hbadmin != null) {
- hbadmin.close();
- }
- if(conn != null){
- conn.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
deleted file mode 100644
index a3b5db5..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
-import org.apache.hadoop.hbase.backup.BackupHandler;
-import org.apache.hadoop.hbase.backup.BackupUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.tools.DistCpOptions;
-/**
- * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
- * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
- * implementation. The other is copying for incremental log files, which bases on extending
- * DistCp's function with copy progress reporting to ZooKeeper implementation.
- *
- * For now this is only a wrapper. The other features such as progress and increment backup will be
- * implemented in future jira
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MapReduceBackupCopyService implements BackupCopyService {
- private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
-
- private Configuration conf;
- // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
-
- // Accumulated progress within the whole backup process for the copy operation
- private float progressDone = 0.1f;
- private long bytesCopied = 0;
- private static float INIT_PROGRESS = 0.1f;
-
- // The percentage of the current copy task within the whole task if multiple time copies are
- // needed. The default value is 100%, which means only 1 copy task for the whole.
- private float subTaskPercntgInWholeTask = 1f;
-
- public MapReduceBackupCopyService() {
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Get the current copy task percentage within the whole task if multiple copies are needed.
- * @return the current copy task percentage
- */
- public float getSubTaskPercntgInWholeTask() {
- return subTaskPercntgInWholeTask;
- }
-
- /**
- * Set the current copy task percentage within the whole task if multiple copies are needed. Must
- * be called before calling
- * {@link #copy(BackupHandler, Configuration, Type, String[])}
- * @param subTaskPercntgInWholeTask The percentage of the copy subtask
- */
- public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
- this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
- }
-
- class SnapshotCopy extends ExportSnapshot {
- private BackupHandler backupHandler;
- private String table;
-
- public SnapshotCopy(BackupHandler backupHandler, String table) {
- super();
- this.backupHandler = backupHandler;
- this.table = table;
- }
-
- public BackupHandler getBackupHandler() {
- return this.backupHandler;
- }
-
- public String getTable() {
- return this.table;
- }
- }
-
- // Extends DistCp for progress updating to hbase:backup
- // during backup. Using DistCpV2 (MAPREDUCE-2765).
- // Simply extend it and override execute() method to get the
- // Job reference for progress updating.
- // Only the argument "src1, [src2, [...]] dst" is supported,
- // no more DistCp options.
- class BackupDistCp extends DistCp {
-
- private BackupHandler backupHandler;
-
- public BackupDistCp(Configuration conf, DistCpOptions options, BackupHandler backupHandler)
- throws Exception {
- super(conf, options);
- this.backupHandler = backupHandler;
- }
-
- @Override
- public Job execute() throws Exception {
-
- // reflection preparation for private methods and fields
- Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
- Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
- Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
- Method methodCreateInputFileListing =
- classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
- Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
-
- Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
- Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
- Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
- Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
-
- methodCreateMetaFolderPath.setAccessible(true);
- methodCreateJob.setAccessible(true);
- methodCreateInputFileListing.setAccessible(true);
- methodCleanup.setAccessible(true);
-
- fieldInputOptions.setAccessible(true);
- fieldMetaFolder.setAccessible(true);
- fieldJobFS.setAccessible(true);
- fieldSubmitted.setAccessible(true);
-
- // execute() logic starts here
- assert fieldInputOptions.get(this) != null;
- assert getConf() != null;
-
- Job job = null;
- try {
- synchronized (this) {
- // Don't cleanup while we are setting up.
- fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
- fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
-
- job = (Job) methodCreateJob.invoke(this);
- }
- methodCreateInputFileListing.invoke(this, job);
-
- // Get the total length of the source files
- List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
- long totalSrcLgth = 0;
- for (Path aSrc : srcs) {
- totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
- }
-
- // submit the copy job
- job.submit();
- fieldSubmitted.set(this, true);
-
- // after submit the MR job, set its handler in backup handler for cancel process
- // this.backupHandler.copyJob = job;
-
- // Update the copy progress to ZK every 0.5s if progress value changed
- int progressReportFreq =
- this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
- float lastProgress = progressDone;
- while (!job.isComplete()) {
- float newProgress =
- progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
-
- if (newProgress > lastProgress) {
-
- BigDecimal progressData =
- new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
- String newProgressStr = progressData + "%";
- LOG.info("Progress: " + newProgressStr);
- this.backupHandler.updateProgress(newProgressStr, bytesCopied);
- LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
- + ".\"");
- lastProgress = newProgress;
- }
- Thread.sleep(progressReportFreq);
- }
-
- // update the progress data after copy job complete
- float newProgress =
- progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
- BigDecimal progressData =
- new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
-
- String newProgressStr = progressData + "%";
- LOG.info("Progress: " + newProgressStr);
-
- // accumulate the overall backup progress
- progressDone = newProgress;
- bytesCopied += totalSrcLgth;
-
- this.backupHandler.updateProgress(newProgressStr, bytesCopied);
- LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
- + " - " + bytesCopied + " bytes copied.\"");
-
- } finally {
- if (!fieldSubmitted.getBoolean(this)) {
- methodCleanup.invoke(this);
- }
- }
-
- String jobID = job.getJobID().toString();
- job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
-
- LOG.debug("DistCp job-id: " + jobID);
- return job;
- }
-
- }
-
- /**
- * Do backup copy based on different types.
- * @param handler The backup handler reference
- * @param conf The hadoop configuration
- * @param copyType The backup copy type
- * @param options Options for customized ExportSnapshot or DistCp
- * @throws Exception exception
- */
- public int copy(BackupHandler handler, Configuration conf, BackupCopyService.Type copyType,
- String[] options) throws IOException {
-
- int res = 0;
-
- try {
- if (copyType == Type.FULL) {
- SnapshotCopy snapshotCp =
- new SnapshotCopy(handler, handler.getBackupContext().getTableBySnapshot(options[1]));
- LOG.debug("Doing SNAPSHOT_COPY");
- // Make a new instance of conf to be used by the snapshot copy class.
- snapshotCp.setConf(new Configuration(conf));
- res = snapshotCp.run(options);
- } else if (copyType == Type.INCREMENTAL) {
- LOG.debug("Doing COPY_TYPE_DISTCP");
- setSubTaskPercntgInWholeTask(1f);
-
- BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, handler);
- // Handle a special case where the source file is a single file.
- // In this case, distcp will not create the target dir. It just take the
- // target as a file name and copy source file to the target (as a file name).
- // We need to create the target dir before run distcp.
- LOG.debug("DistCp options: " + Arrays.toString(options));
- if (options.length == 2) {
- Path dest = new Path(options[1]);
- FileSystem destfs = dest.getFileSystem(conf);
- if (!destfs.exists(dest)) {
- destfs.mkdirs(dest);
- }
- }
-
- res = distcp.run(options);
- }
- return res;
-
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
deleted file mode 100644
index deefbf7..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class MapReduceRestoreService implements IncrementalRestoreService {
- public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
-
- private WALPlayer player;
-
- public MapReduceRestoreService() {
- this.player = new WALPlayer();
- }
-
- @Override
- public void run(String logDir, String[] tableNames, String[] newTableNames) throws IOException {
- String tableStr = HBackupFileSystem.join(tableNames);
- String newTableStr = HBackupFileSystem.join(newTableNames);
-
- // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
- // log file
-
- String[] playerArgs = { logDir, tableStr, newTableStr };
- LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables "
- + HBackupFileSystem.join(tableNames) + " to tables "
- + HBackupFileSystem.join(newTableNames));
- try {
- player.run(playerArgs);
- } catch (Exception e) {
- throw new IOException("cannot restore from backup directory " + logDir
- + " (check Hadoop and HBase logs) " + e);
- }
- }
-
- @Override
- public Configuration getConf() {
- return player.getConf();
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.player.setConf(conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
deleted file mode 100644
index 4712548..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.master;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.backup.BackupSystemTable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-
-
-
-
-/**
- * Implementation of a log cleaner that checks if a log is still scheduled for
- * incremental backup before deleting it when its TTL is over.
- */
-@InterfaceStability.Evolving
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class BackupLogCleaner extends BaseLogCleanerDelegate {
- private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
-
- private boolean stopped = false;
-
- public BackupLogCleaner() {
- }
-
- @Override
- public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
- // all members of this class are null if backup is disabled,
- // so we cannot filter the files
- if (this.getConf() == null) {
- return files;
- }
-
- try {
- final BackupSystemTable table = BackupSystemTable.getTable(getConf());
- // If we do not have recorded backup sessions
- if (table.hasBackupSessions() == false) {
- return files;
- }
- return Iterables.filter(files, new Predicate<FileStatus>() {
- @Override
- public boolean apply(FileStatus file) {
- try {
- String wal = file.getPath().toString();
- boolean logInSystemTable = table.checkWALFile(wal);
- if (LOG.isDebugEnabled()) {
- if (logInSystemTable) {
- LOG.debug("Found log file in hbase:backup, deleting: " + wal);
- } else {
- LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal);
- }
- }
- return logInSystemTable;
- } catch (IOException e) {
- LOG.error(e);
- return false;// keep file for a while, HBase failed
- }
- }
- });
- } catch (IOException e) {
- LOG.error("Failed to get hbase:backup table, therefore will keep all files", e);
- // nothing to delete
- return new ArrayList<FileStatus>();
- }
-
- }
-
- @Override
- public void setConf(Configuration config) {
- // If backup is disabled, keep all members null
- if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
- LOG.warn("Backup is disabled - allowing all wals to be deleted");
- return;
- }
- super.setConf(config);
- }
-
- @Override
- public void stop(String why) {
- if (this.stopped) {
- return;
- }
- this.stopped = true;
- LOG.info("Stopping BackupLogCleaner");
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
deleted file mode 100644
index f96682f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.master;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.MetricsMaster;
-import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
-import org.apache.hadoop.hbase.procedure.Procedure;
-import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
-import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.zookeeper.KeeperException;
-
-public class LogRollMasterProcedureManager extends MasterProcedureManager {
-
- public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
- public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
- private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
-
- private MasterServices master;
- private ProcedureCoordinator coordinator;
- private boolean done;
-
- @Override
- public void stop(String why) {
- LOG.info("stop: " + why);
- }
-
- @Override
- public boolean isStopped() {
- return false;
- }
-
- @Override
- public void initialize(MasterServices master, MetricsMaster metricsMaster)
- throws KeeperException, IOException, UnsupportedOperationException {
- this.master = master;
- this.done = false;
-
- // setup the default procedure coordinator
- String name = master.getServerName().toString();
- ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
- BaseCoordinatedStateManager coordManager =
- (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
- .getCoordinatedStateManager(master.getConfiguration());
- coordManager.initialize(master);
-
- ProcedureCoordinatorRpcs comms =
- coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
-
- this.coordinator = new ProcedureCoordinator(comms, tpool);
- }
-
- @Override
- public String getProcedureSignature() {
- return ROLLLOG_PROCEDURE_SIGNATURE;
- }
-
- @Override
- public void execProcedure(ProcedureDescription desc) throws IOException {
- this.done = false;
- // start the process on the RS
- ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
- List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
- List<String> servers = new ArrayList<String>();
- for (ServerName sn : serverNames) {
- servers.add(sn.toString());
- }
- Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
- if (proc == null) {
- String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
- LOG.error(msg);
- throw new IOException(msg);
- }
-
- try {
- // wait for the procedure to complete. A timer thread is kicked off that should cancel this
- // if it takes too long.
- proc.waitForCompleted();
- LOG.info("Done waiting - exec procedure for " + desc.getInstance());
- LOG.info("Distributed roll log procedure is successful!");
- this.done = true;
- } catch (InterruptedException e) {
- ForeignException ee =
- new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
- monitor.receive(ee);
- Thread.currentThread().interrupt();
- } catch (ForeignException e) {
- ForeignException ee =
- new ForeignException("Exception while waiting for roll log procdure to finish", e);
- monitor.receive(ee);
- }
- monitor.rethrowException();
- }
-
- @Override
- public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
- return done;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
deleted file mode 100644
index 618748e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.regionserver;
-
-import java.util.HashMap;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.backup.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
-import org.apache.hadoop.hbase.procedure.ProcedureMember;
-import org.apache.hadoop.hbase.procedure.Subprocedure;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-
-
-/**
- * This backup subprocedure implementation forces a log roll on the RS.
- */
-public class LogRollBackupSubprocedure extends Subprocedure {
- private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
-
- private final RegionServerServices rss;
- private final LogRollBackupSubprocedurePool taskManager;
- private FSHLog hlog;
-
- public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
- ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
- LogRollBackupSubprocedurePool taskManager) {
-
- super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
- wakeFrequency, timeout);
- LOG.info("Constructing a LogRollBackupSubprocedure.");
- this.rss = rss;
- this.taskManager = taskManager;
- }
-
- /**
- * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
- * with no use of subprocedurepool.
- */
- class RSRollLogTask implements Callable<Void> {
- RSRollLogTask() {
- }
-
- @Override
- public Void call() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("++ DRPC started: " + rss.getServerName());
- }
- hlog = (FSHLog) rss.getWAL(null);
- long filenum = hlog.getFilenum();
-
- LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
- hlog.rollWriter(true);
- LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
- // write the log number to hbase:backup.
- BackupSystemTable table = BackupSystemTable.getTable(rss.getConfiguration());
- // sanity check, good for testing
- HashMap<String, String> serverTimestampMap = table.readRegionServerLastLogRollResult();
- String host = rss.getServerName().getHostname();
- String sts = serverTimestampMap.get(host);
- if (sts != null && Long.parseLong(sts) > filenum) {
- LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + filenum);
- return null;
- }
- table.writeRegionServerLastLogRollResult(host, Long.toString(filenum));
- // TODO: potential leak of HBase connection
- // BackupSystemTable.close();
- return null;
- }
-
- }
-
- private void rolllog() throws ForeignException {
-
- monitor.rethrowException();
-
- taskManager.submitTask(new RSRollLogTask());
- monitor.rethrowException();
-
- // wait for everything to complete.
- taskManager.waitForOutstandingTasks();
- monitor.rethrowException();
-
- }
-
- @Override
- public void acquireBarrier() throws ForeignException {
- // do nothing, executing in inside barrier step.
- }
-
- /**
- * do a log roll.
- * @return some bytes
- */
- @Override
- public byte[] insideBarrier() throws ForeignException {
- rolllog();
- // FIXME
- return null;
- }
-
- /**
- * Cancel threads if they haven't finished.
- */
- @Override
- public void cleanup(Exception e) {
- taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
- }
-
- /**
- * Hooray!
- */
- public void releaseBarrier() {
- // NO OP
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
deleted file mode 100644
index 1ca638c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.regionserver;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.DaemonThreadFactory;
-import org.apache.hadoop.hbase.errorhandling.ForeignException;
-
-/**
- * Handle running each of the individual tasks for completing a backup procedure
- * on a regionserver.
- */
-public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
- private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
-
- /** Maximum number of concurrent snapshot region tasks that can run concurrently */
- private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
- private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
-
- private final ExecutorCompletionService<Void> taskPool;
- private final ThreadPoolExecutor executor;
- private volatile boolean aborted;
- private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
- private final String name;
-
- public LogRollBackupSubprocedurePool(String name, Configuration conf) {
- // configure the executor service
- long keepAlive =
- conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
- LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
- int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
- this.name = name;
- executor =
- new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
- + ")-backup-pool"));
- taskPool = new ExecutorCompletionService<Void>(executor);
- }
-
- /**
- * Submit a task to the pool.
- */
- public void submitTask(final Callable<Void> task) {
- Future<Void> f = this.taskPool.submit(task);
- futures.add(f);
- }
-
- /**
- * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
- * @return <tt>true</tt> on success, <tt>false</tt> otherwise
- * @throws ForeignException exception
- */
- public boolean waitForOutstandingTasks() throws ForeignException {
- LOG.debug("Waiting for backup procedure to finish.");
-
- try {
- for (Future<Void> f : futures) {
- f.get();
- }
- return true;
- } catch (InterruptedException e) {
- if (aborted) {
- throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
- e);
- }
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof ForeignException) {
- throw (ForeignException) e.getCause();
- }
- throw new ForeignException(name, e.getCause());
- } finally {
- // close off remaining tasks
- for (Future<Void> f : futures) {
- if (!f.isDone()) {
- f.cancel(true);
- }
- }
- }
- return false;
- }
-
- /**
- * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
- * finish
- */
- @Override
- public void close() {
- executor.shutdown();
- }
-
- @Override
- public void abort(String why, Throwable e) {
- if (this.aborted) {
- return;
- }
-
- this.aborted = true;
- LOG.warn("Aborting because: " + why, e);
- this.executor.shutdownNow();
- }
-
- @Override
- public boolean isAborted() {
- return this.aborted;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
deleted file mode 100644
index aca190c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.regionserver;
-
-
-import java.io.IOException;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
-import org.apache.hadoop.hbase.procedure.ProcedureMember;
-import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
-import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
-import org.apache.hadoop.hbase.procedure.Subprocedure;
-import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-
-/**
- * This manager class handles the work dealing with backup for a {@link HRegionServer}.
- * <p>
- * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
- * responsible by this region server. If any failures occur with the subprocedure, the manager's
- * procedure member notifies the procedure coordinator to abort all others.
- * <p>
- * On startup, requires {@link #start()} to be called.
- * <p>
- * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be
- * called
- */
-public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
-
- private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
-
- /** Conf key for number of request threads to start backup on regionservers */
- public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
- /** # of threads for backup work on the rs. */
- public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
-
- public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
- public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
-
- /** Conf key for millis between checks to see if backup work completed or if there are errors */
- public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
- /** Default amount of time to check for errors while regions finish backup work */
- private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
-
- private RegionServerServices rss;
- private ProcedureMemberRpcs memberRpcs;
- private ProcedureMember member;
-
- /**
- * Create a default backup procedure manager
- */
- public LogRollRegionServerProcedureManager() {
- }
-
- /**
- * Start accepting backup procedure requests.
- */
- @Override
- public void start() {
- this.memberRpcs.start(rss.getServerName().toString(), member);
- LOG.info("Started region server backup manager.");
- }
-
- /**
- * Close <tt>this</tt> and all running backup procedure tasks
- * @param force forcefully stop all running tasks
- * @throws IOException exception
- */
- @Override
- public void stop(boolean force) throws IOException {
- String mode = force ? "abruptly" : "gracefully";
- LOG.info("Stopping RegionServerBackupManager " + mode + ".");
-
- try {
- this.member.close();
- } finally {
- this.memberRpcs.close();
- }
- }
-
- /**
- * If in a running state, creates the specified subprocedure for handling a backup procedure.
- * @return Subprocedure to submit to the ProcedureMemeber.
- */
- public Subprocedure buildSubprocedure() {
-
- // don't run a backup if the parent is stop(ping)
- if (rss.isStopping() || rss.isStopped()) {
- throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
- + ", because stopping/stopped!");
- }
-
- LOG.info("Attempting to run a roll log procedure for backup.");
- ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
- Configuration conf = rss.getConfiguration();
- long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
- long wakeMillis =
- conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
-
- LogRollBackupSubprocedurePool taskManager =
- new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
- return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
- taskManager);
-
- }
-
- /**
- * Build the actual backup procedure runner that will do all the 'hard' work
- */
- public class BackupSubprocedureBuilder implements SubprocedureFactory {
-
- @Override
- public Subprocedure buildSubprocedure(String name, byte[] data) {
- return LogRollRegionServerProcedureManager.this.buildSubprocedure();
- }
- }
-
- @Override
- public void initialize(RegionServerServices rss) throws IOException {
- this.rss = rss;
- BaseCoordinatedStateManager coordManager =
- (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
- .getConfiguration());
- coordManager.initialize(rss);
- this.memberRpcs =
- coordManager
- .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
-
- // read in the backup handler configuration properties
- Configuration conf = rss.getConfiguration();
- long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
- int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
- // create the actual cohort member
- ThreadPoolExecutor pool =
- ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
- this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
- }
-
- @Override
- public String getProcedureSignature() {
- return "backup-proc";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index 3342743..ae36f08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,11 +17,7 @@
*/
package org.apache.hadoop.hbase.coordination;
-import java.io.IOException;
-
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
@@ -55,21 +51,8 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for split log worker
*/
public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination();
-
/**
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
- /**
- * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
- */
- public abstract ProcedureCoordinatorRpcs
- getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
-
- /**
- * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
- */
- public abstract ProcedureMemberRpcs
- getProcedureMemberRpcs(String procType) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 7cf4aab..3e89be7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,15 +17,9 @@
*/
package org.apache.hadoop.hbase.coordination;
-import java.io.IOException;
-
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
-import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
-import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -55,21 +49,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
@Override
public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
return splitLogWorkerCoordination;
- }
-
+ }
@Override
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
-
- @Override
- public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
- throws IOException {
- return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
- }
-
- @Override
- public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
- return new ZKProcedureMemberRpcs(watcher, procType);
- }
}