You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/08/23 16:47:28 UTC
[31/36] hbase git commit: HBASE-17614: Move Backup/Restore into
separate module (Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
new file mode 100644
index 0000000..49e8c75
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class MapReduceHFileSplitterJob extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class);
+ final static String NAME = "HFileSplitterJob";
+ public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+ public final static String TABLES_KEY = "hfile.input.tables";
+ public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ public MapReduceHFileSplitterJob() {
+ }
+
+ protected MapReduceHFileSplitterJob(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * A mapper that just writes out cells. This one can be used together with
+ * {@link KeyValueSortReducer}
+ */
+ static class HFileCellMapper extends
+ Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+ @Override
+ public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+ InterruptedException {
+ // Convert value to KeyValue if subclass
+ if (!value.getClass().equals(KeyValue.class)) {
+ value =
+ new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+ value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+ value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+ value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+ value.getValueOffset(), value.getValueLength());
+ }
+ context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ // do nothing
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Configuration conf = getConf();
+ String inputDirs = args[0];
+ String tabName = args[1];
+ conf.setStrings(TABLES_KEY, tabName);
+ conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+ Job job =
+ Job.getInstance(conf,
+ conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+ job.setJarByClass(MapReduceHFileSplitterJob.class);
+ job.setInputFormatClass(HFileInputFormat.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ if (hfileOutPath != null) {
+ LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+ TableName tableName = TableName.valueOf(tabName);
+ job.setMapperClass(HFileCellMapper.class);
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputValueClass(KeyValue.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+ }
+ LOG.debug("success configuring load incremental job");
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+ } else {
+ throw new IOException("No bulk output directory specified");
+ }
+ return job;
+ }
+
+ /**
+ * Print usage
+ * @param errorMsg Error message. Can be null.
+ */
+ private void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+ System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+ System.err.println("<table> table to load.\n");
+ System.err.println("To generate HFiles for a bulk data load, pass the option:");
+ System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err.println("Other options:");
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the HFile splitter");
+ System.err.println("For performance also consider the following options:\n"
+ + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
+ }
+
+ /**
+ * Main entry point.
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ System.exit(-1);
+ }
+ Job job = createSubmittableJob(args);
+ int result = job.waitForCompletion(true) ? 0 : 1;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
new file mode 100644
index 0000000..1209e7c
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.Tool;
+
+
+/**
+ * MapReduce implementation of {@link RestoreJob}
+ *
+ * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates
+ * HFiles which are aligned with a region boundaries of a table being
+ * restored.
+ *
+ * The resulting HFiles then are loaded using HBase bulk load tool
+ * {@link LoadIncrementalHFiles}
+ */
+@InterfaceAudience.Private
+public class MapReduceRestoreJob implements RestoreJob {
+ public static final Log LOG = LogFactory.getLog(MapReduceRestoreJob.class);
+
+ private Tool player;
+ private Configuration conf;
+
+ public MapReduceRestoreJob() {
+ }
+
+ @Override
+ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
+ boolean fullBackupRestore) throws IOException {
+
+ String bulkOutputConfKey;
+
+ player = new MapReduceHFileSplitterJob();
+ bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file
+ String dirs = StringUtils.join(dirPaths, ",");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+ + " backup from directory " + dirs + " from hbase tables "
+ + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
+ + " to tables "
+ + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
+ }
+
+ for (int i = 0; i < tableNames.length; i++) {
+
+ LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
+ Path bulkOutputPath =
+ BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]),
+ getConf());
+ Configuration conf = getConf();
+ conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+ String[] playerArgs =
+ {
+ dirs,
+ fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i]
+ .getNameAsString() };
+
+ int result = 0;
+ int loaderResult = 0;
+ try {
+
+ player.setConf(getConf());
+ result = player.run(playerArgs);
+ if (succeeded(result)) {
+ // do bulk load
+ LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
+ }
+ String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
+ loaderResult = loader.run(args);
+
+ if (failed(loaderResult)) {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+ }
+ } else {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+ }
+ LOG.debug("Restore Job finished:" + result);
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop and HBase logs) ", e);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
new file mode 100644
index 0000000..b5b887c
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
+ * before deleting it when its TTL is over.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupLogCleaner extends BaseLogCleanerDelegate {
+ private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
+
+ private boolean stopped = false;
+ private Connection conn;
+
+ public BackupLogCleaner() {
+ }
+
+ @Override
+ public void init(Map<String, Object> params) {
+ if (params != null && params.containsKey(HMaster.MASTER)) {
+ MasterServices master = (MasterServices) params.get(HMaster.MASTER);
+ conn = master.getConnection();
+ if (getConf() == null) {
+ super.setConf(conn.getConfiguration());
+ }
+ }
+ if (conn == null) {
+ try {
+ conn = ConnectionFactory.createConnection(getConf());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to create connection", ioe);
+ }
+ }
+ }
+
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ // all members of this class are null if backup is disabled,
+ // so we cannot filter the files
+ if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
+ LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ + " setting");
+ return files;
+ }
+
+ List<FileStatus> list = new ArrayList<FileStatus>();
+ try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+ // If we do not have recorded backup sessions
+ try {
+ if (!table.hasBackupSessions()) {
+ LOG.trace("BackupLogCleaner has no backup sessions");
+ return files;
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.warn("backup system table is not available" + tnfe.getMessage());
+ return files;
+ }
+
+ for (FileStatus file : files) {
+ String wal = file.getPath().toString();
+ boolean logInSystemTable = table.isWALFileDeletable(wal);
+ if (LOG.isDebugEnabled()) {
+ if (logInSystemTable) {
+ LOG.debug("Found log file in backup system table, deleting: " + wal);
+ list.add(file);
+ } else {
+ LOG.debug("Didn't find this log in backup system table, keeping: " + wal);
+ }
+ }
+ }
+ return list;
+ } catch (IOException e) {
+ LOG.error("Failed to get backup system table table, therefore will keep all files", e);
+ // nothing to delete
+ return new ArrayList<FileStatus>();
+ }
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ // If backup is disabled, keep all members null
+ if (!config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
+ BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)) {
+ LOG.warn("Backup is disabled - allowing all wals to be deleted");
+ return;
+ }
+ super.setConf(config);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ this.stopped = true;
+ LOG.info("Stopping BackupLogCleaner");
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
new file mode 100644
index 0000000..47e428c
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Master procedure manager for coordinated cluster-wide WAL roll operation, which is run during
+ * backup operation, see {@link MasterProcedureManager} and and {@link RegionServerProcedureManager}
+ */
+@InterfaceAudience.Private
+public class LogRollMasterProcedureManager extends MasterProcedureManager {
+
+ public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
+ public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
+ private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
+
+ private MasterServices master;
+ private ProcedureCoordinator coordinator;
+ private boolean done;
+
+ @Override
+ public void stop(String why) {
+ LOG.info("stop: " + why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public void initialize(MasterServices master, MetricsMaster metricsMaster)
+ throws KeeperException, IOException, UnsupportedOperationException {
+ this.master = master;
+ this.done = false;
+
+ // setup the default procedure coordinator
+ String name = master.getServerName().toString();
+ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
+ .getCoordinatedStateManager(master.getConfiguration());
+ coordManager.initialize(master);
+
+ ProcedureCoordinatorRpcs comms =
+ coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
+
+ this.coordinator = new ProcedureCoordinator(comms, tpool);
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return ROLLLOG_PROCEDURE_SIGNATURE;
+ }
+
+ @Override
+ public void execProcedure(ProcedureDescription desc) throws IOException {
+ if (!isBackupEnabled()) {
+ LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ + " setting");
+ return;
+ }
+ this.done = false;
+ // start the process on the RS
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+ List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+ List<String> servers = new ArrayList<String>();
+ for (ServerName sn : serverNames) {
+ servers.add(sn.toString());
+ }
+
+ List<NameStringPair> conf = desc.getConfigurationList();
+ byte[] data = new byte[0];
+ if (conf.size() > 0) {
+ // Get backup root path
+ data = conf.get(0).getValue().getBytes();
+ }
+ Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers);
+ if (proc == null) {
+ String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ try {
+ // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+ // if it takes too long.
+ proc.waitForCompleted();
+ LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+ LOG.info("Distributed roll log procedure is successful!");
+ this.done = true;
+ } catch (InterruptedException e) {
+ ForeignException ee =
+ new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
+ monitor.receive(ee);
+ Thread.currentThread().interrupt();
+ } catch (ForeignException e) {
+ ForeignException ee =
+ new ForeignException("Exception while waiting for roll log procdure to finish", e);
+ monitor.receive(ee);
+ }
+ monitor.rethrowException();
+ }
+
+ private boolean isBackupEnabled() {
+ return BackupManager.isBackupEnabled(master.getConfiguration());
+ }
+
+ @Override
+ public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+ return done;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
new file mode 100644
index 0000000..8fc644c
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+
+/**
+ * This backup sub-procedure implementation forces a WAL rolling on a RS.
+ */
+@InterfaceAudience.Private
+public class LogRollBackupSubprocedure extends Subprocedure {
+ private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
+
+ private final RegionServerServices rss;
+ private final LogRollBackupSubprocedurePool taskManager;
+ private FSHLog hlog;
+ private String backupRoot;
+
+ public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
+ ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+ LogRollBackupSubprocedurePool taskManager, byte[] data) {
+
+ super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
+ wakeFrequency, timeout);
+ LOG.info("Constructing a LogRollBackupSubprocedure.");
+ this.rss = rss;
+ this.taskManager = taskManager;
+ if (data != null) {
+ backupRoot = new String(data);
+ }
+ }
+
+ /**
+ * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
+ * with no use of subprocedurepool.
+ */
+ class RSRollLogTask implements Callable<Void> {
+ RSRollLogTask() {
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("++ DRPC started: " + rss.getServerName());
+ }
+ hlog = (FSHLog) rss.getWAL(null);
+ long filenum = hlog.getFilenum();
+ List<WAL> wals = rss.getWALs();
+ long highest = -1;
+ for (WAL wal : wals) {
+ if (wal == null) continue;
+ if (((AbstractFSWAL) wal).getFilenum() > highest) {
+ highest = ((AbstractFSWAL) wal).getFilenum();
+ }
+ }
+
+ LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum
+ + " highest: " + highest + " on " + rss.getServerName());
+ ((HRegionServer) rss).getWalRoller().requestRollAll();
+ long start = EnvironmentEdgeManager.currentTime();
+ while (!((HRegionServer) rss).getWalRoller().walRollFinished()) {
+ Thread.sleep(20);
+ }
+ LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start));
+ LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()
+ + " on " + rss.getServerName());
+
+ Connection connection = rss.getConnection();
+ try (final BackupSystemTable table = new BackupSystemTable(connection)) {
+ // sanity check, good for testing
+ HashMap<String, Long> serverTimestampMap =
+ table.readRegionServerLastLogRollResult(backupRoot);
+ String host = rss.getServerName().getHostname();
+ int port = rss.getServerName().getPort();
+ String server = host + ":" + port;
+ Long sts = serverTimestampMap.get(host);
+ if (sts != null && sts > highest) {
+ LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + highest);
+ return null;
+ }
+ // write the log number to backup system table.
+ table.writeRegionServerLastLogRollResult(server, highest, backupRoot);
+ return null;
+ } catch (Exception e) {
+ LOG.error(e);
+ throw e;
+ }
+ }
+ }
+
+ private void rolllog() throws ForeignException {
+ monitor.rethrowException();
+
+ taskManager.submitTask(new RSRollLogTask());
+ monitor.rethrowException();
+
+ // wait for everything to complete.
+ taskManager.waitForOutstandingTasks();
+ monitor.rethrowException();
+
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ // do nothing, executing in inside barrier step.
+ }
+
+ /**
+ * do a log roll.
+ * @return some bytes
+ */
+ @Override
+ public byte[] insideBarrier() throws ForeignException {
+ rolllog();
+ return null;
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+ }
+
+ /**
+ * Hooray!
+ */
+ public void releaseBarrier() {
+ // NO OP
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
new file mode 100644
index 0000000..65a1fa3
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * Handle running each of the individual tasks for completing a backup procedure on a region
+ * server.
+ */
+@InterfaceAudience.Private
+public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
+ private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
+
+ /** Maximum number of concurrent snapshot region tasks that can run concurrently */
+ private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
+ private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
+
+ private final ExecutorCompletionService<Void> taskPool;
+ private final ThreadPoolExecutor executor;
+ private volatile boolean aborted;
+ private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ private final String name;
+
+ public LogRollBackupSubprocedurePool(String name, Configuration conf) {
+ // configure the executor service
+ long keepAlive =
+ conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
+ LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
+ this.name = name;
+ executor =
+ new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
+ + ")-backup-pool"));
+ taskPool = new ExecutorCompletionService<Void>(executor);
+ }
+
+ /**
+ * Submit a task to the pool.
+ */
+ public void submitTask(final Callable<Void> task) {
+ Future<Void> f = this.taskPool.submit(task);
+ futures.add(f);
+ }
+
+ /**
+ * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+ * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+ * @throws ForeignException exception
+ */
+ public boolean waitForOutstandingTasks() throws ForeignException {
+ LOG.debug("Waiting for backup procedure to finish.");
+
+ try {
+ for (Future<Void> f : futures) {
+ f.get();
+ }
+ return true;
+ } catch (InterruptedException e) {
+ if (aborted) {
+ throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
+ e);
+ }
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ForeignException) {
+ throw (ForeignException) e.getCause();
+ }
+ throw new ForeignException(name, e.getCause());
+ } finally {
+ // close off remaining tasks
+ for (Future<Void> f : futures) {
+ if (!f.isDone()) {
+ f.cancel(true);
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+ * finish
+ */
+ @Override
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (this.aborted) {
+ return;
+ }
+
+ this.aborted = true;
+ LOG.warn("Aborting because: " + why, e);
+ this.executor.shutdownNow();
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
new file mode 100644
index 0000000..9d5a858
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This manager class handles the work dealing with distributed WAL roll request.
+ * <p>
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the sub-procedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ * <p>
+ * On startup, requires {@link #start()} to be called.
+ * <p>
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be called
+ */
+@InterfaceAudience.Private
+public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
+
+ private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
+
+ /** Conf key for number of request threads to start backup on region servers */
+ public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+ /** # of threads for backup work on the rs. */
+ public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+ public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+ public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ /** Conf key for millis between checks to see if backup work completed or if there are errors */
+ public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+ /** Default amount of time to check for errors while regions finish backup work */
+ private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+ private boolean started = false;
+
+ /**
+ * Create a default backup procedure manager
+ */
+ public LogRollRegionServerProcedureManager() {
+ }
+
+ /**
+ * Start accepting backup procedure requests.
+ */
+ @Override
+ public void start() {
+ if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
+ LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ + " setting");
+ return;
+ }
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ started = true;
+ LOG.info("Started region server backup manager.");
+ }
+
+ /**
+ * Close <tt>this</tt> and all running backup procedure tasks
+ * @param force forcefully stop all running tasks
+ * @throws IOException exception
+ */
+ @Override
+ public void stop(boolean force) throws IOException {
+ if (!started) {
+ return;
+ }
+ String mode = force ? "abruptly" : "gracefully";
+ LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure for handling a backup procedure.
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure(byte[] data) {
+
+ // don't run a backup if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+ + ", because stopping/stopped!");
+ }
+
+ LOG.info("Attempting to run a roll log procedure for backup.");
+ ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+ Configuration conf = rss.getConfiguration();
+ long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ long wakeMillis =
+ conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+ LogRollBackupSubprocedurePool taskManager =
+ new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
+ return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+ taskManager, data);
+
+ }
+
+ /**
+ * Build the actual backup procedure runner that will do all the 'hard' work
+ */
+ public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ return LogRollRegionServerProcedureManager.this.buildSubprocedure(data);
+ }
+ }
+
+ @Override
+ public void initialize(RegionServerServices rss) throws KeeperException {
+ this.rss = rss;
+ if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
+ LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ + " setting");
+ return;
+ }
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.
+ getCoordinatedStateManager(rss.getConfiguration());
+ coordManager.initialize(rss);
+ this.memberRpcs =
+ coordManager
+ .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+ // read in the backup handler configuration properties
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+ // create the actual cohort member
+ ThreadPoolExecutor pool =
+ ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return "backup-proc";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
new file mode 100644
index 0000000..0da6fc4
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Backup set is a named group of HBase tables, which are managed together by Backup/Restore
+ * framework. Instead of using list of tables in backup or restore operation, one can use set's name
+ * instead.
+ */
+@InterfaceAudience.Private
+public class BackupSet {
+ private final String name;
+ private final List<TableName> tables;
+
+ public BackupSet(String name, List<TableName> tables) {
+ this.name = name;
+ this.tables = tables;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<TableName> getTables() {
+ return tables;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name).append("={");
+ sb.append(StringUtils.join(tables, ','));
+ sb.append("}");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/37c65946/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
new file mode 100644
index 0000000..ce77645
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -0,0 +1,747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * A collection for methods used by multiple classes to backup HBase tables.
+ */
+@InterfaceAudience.Private
+public final class BackupUtils {
+ protected static final Log LOG = LogFactory.getLog(BackupUtils.class);
+ public static final String LOGNAME_SEPARATOR = ".";
+ public static final int MILLISEC_IN_HOUR = 3600000;
+
+ private BackupUtils() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ /**
+ * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value
+ * for the RS among the tables.
+ * @param rsLogTimestampMap timestamp map
+ * @return the min timestamp of each RS
+ */
+ public static HashMap<String, Long> getRSLogTimestampMins(
+ HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) {
+
+ if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
+ return null;
+ }
+
+ HashMap<String, Long> rsLogTimestampMins = new HashMap<String, Long>();
+ HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS =
+ new HashMap<String, HashMap<TableName, Long>>();
+
+ for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
+ TableName table = tableEntry.getKey();
+ HashMap<String, Long> rsLogTimestamp = tableEntry.getValue();
+ for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) {
+ String rs = rsEntry.getKey();
+ Long ts = rsEntry.getValue();
+ if (!rsLogTimestampMapByRS.containsKey(rs)) {
+ rsLogTimestampMapByRS.put(rs, new HashMap<TableName, Long>());
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ } else {
+ rsLogTimestampMapByRS.get(rs).put(table, ts);
+ }
+ }
+ }
+
+ for (Entry<String, HashMap<TableName, Long>> entry : rsLogTimestampMapByRS.entrySet()) {
+ String rs = entry.getKey();
+ rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue()));
+ }
+
+ return rsLogTimestampMins;
+ }
+
+ /**
+ * copy out Table RegionInfo into incremental backup image need to consider move this logic into
+ * HBackupFileSystem
+ * @param conn connection
+ * @param backupInfo backup info
+ * @param conf configuration
+ * @throws IOException exception
+ * @throws InterruptedException exception
+ */
+ public static void
+ copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
+ throws IOException, InterruptedException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ // for each table in the table set, copy out the table info and region
+ // info files in the correct directory structure
+ for (TableName table : backupInfo.getTables()) {
+
+ if (!MetaTableAccessor.tableExists(conn, table)) {
+ LOG.warn("Table " + table + " does not exists, skipping it.");
+ continue;
+ }
+ HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table);
+
+ // write a copy of descriptor to the target directory
+ Path target = new Path(backupInfo.getTableBackupDir(table));
+ FileSystem targetFs = target.getFileSystem(conf);
+ FSTableDescriptors descriptors =
+ new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf));
+ descriptors.createTableDescriptorForTableDirectory(target, orig, false);
+ LOG.debug("Attempting to copy table info for:" + table + " target: " + target
+ + " descriptor: " + orig);
+ LOG.debug("Finished copying tableinfo.");
+ List<HRegionInfo> regions = null;
+ regions = MetaTableAccessor.getTableRegions(conn, table);
+ // For each region, write the region info to disk
+ LOG.debug("Starting to write region info for table " + table);
+ for (HRegionInfo regionInfo : regions) {
+ Path regionDir =
+ HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
+ regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
+ writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
+ }
+ LOG.debug("Finished writing region info for table " + table);
+ }
+ }
+
+ /**
+ * Write the .regioninfo file on-disk.
+ */
+ public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs,
+ final Path regionInfoDir, HRegionInfo regionInfo) throws IOException {
+ final byte[] content = regionInfo.toDelimitedByteArray();
+ Path regionInfoFile = new Path(regionInfoDir, "." + HConstants.REGIONINFO_QUALIFIER_STR);
+ // First check to get the permissions
+ FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
+ // Write the RegionInfo file content
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
+ try {
+ out.write(content);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Parses hostname:port from WAL file path
+ * @param p path to WAL file
+ * @return hostname:port
+ */
+ public static String parseHostNameFromLogFile(Path p) {
+ try {
+ if (AbstractFSWALProvider.isArchivedLogFile(p)) {
+ return BackupUtils.parseHostFromOldLog(p);
+ } else {
+ ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p);
+ if (sname != null) {
+ return sname.getAddress().toString();
+ } else {
+ LOG.error("Skip log file (can't parse): " + p);
+ return null;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Skip log file (can't parse): " + p, e);
+ return null;
+ }
+ }
+
+ /**
+ * Returns WAL file name
+ * @param walFileName WAL file name
+ * @return WAL file name
+ * @throws IOException exception
+ * @throws IllegalArgumentException exception
+ */
+ public static String getUniqueWALFileNamePart(String walFileName) throws IOException {
+ return getUniqueWALFileNamePart(new Path(walFileName));
+ }
+
+ /**
+ * Returns WAL file name
+ * @param p WAL file path
+ * @return WAL file name
+ * @throws IOException exception
+ */
+ public static String getUniqueWALFileNamePart(Path p) throws IOException {
+ return p.getName();
+ }
+
+ /**
+ * Get the total length of files under the given directory recursively.
+ * @param fs The hadoop file system
+ * @param dir The target directory
+ * @return the total length of files
+ * @throws IOException exception
+ */
+ public static long getFilesLength(FileSystem fs, Path dir) throws IOException {
+ long totalLength = 0;
+ FileStatus[] files = FSUtils.listStatus(fs, dir);
+ if (files != null) {
+ for (FileStatus fileStatus : files) {
+ if (fileStatus.isDirectory()) {
+ totalLength += getFilesLength(fs, fileStatus.getPath());
+ } else {
+ totalLength += fileStatus.getLen();
+ }
+ }
+ }
+ return totalLength;
+ }
+
+ /**
+ * Get list of all old WAL files (WALs and archive)
+ * @param c configuration
+ * @param hostTimestampMap {host,timestamp} map
+ * @return list of WAL files
+ * @throws IOException exception
+ */
+ public static List<String> getWALFilesOlderThan(final Configuration c,
+ final HashMap<String, Long> hostTimestampMap) throws IOException {
+ Path rootDir = FSUtils.getRootDir(c);
+ Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ List<String> logFiles = new ArrayList<String>();
+
+ PathFilter filter = new PathFilter() {
+
+ @Override
+ public boolean accept(Path p) {
+ try {
+ if (AbstractFSWALProvider.isMetaFile(p)) {
+ return false;
+ }
+ String host = parseHostNameFromLogFile(p);
+ if (host == null) {
+ return false;
+ }
+ Long oldTimestamp = hostTimestampMap.get(host);
+ Long currentLogTS = BackupUtils.getCreationTime(p);
+ return currentLogTS <= oldTimestamp;
+ } catch (Exception e) {
+ LOG.warn("Can not parse" + p, e);
+ return false;
+ }
+ }
+ };
+ FileSystem fs = FileSystem.get(c);
+ logFiles = BackupUtils.getFiles(fs, logDir, logFiles, filter);
+ logFiles = BackupUtils.getFiles(fs, oldLogDir, logFiles, filter);
+ return logFiles;
+ }
+
+ public static TableName[] parseTableNames(String tables) {
+ if (tables == null) {
+ return null;
+ }
+ String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+
+ TableName[] ret = new TableName[tableArray.length];
+ for (int i = 0; i < tableArray.length; i++) {
+ ret[i] = TableName.valueOf(tableArray[i]);
+ }
+ return ret;
+ }
+
+ /**
+ * Check whether the backup path exist
+ * @param backupStr backup
+ * @param conf configuration
+ * @return Yes if path exists
+ * @throws IOException exception
+ */
+ public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException {
+ boolean isExist = false;
+ Path backupPath = new Path(backupStr);
+ FileSystem fileSys = backupPath.getFileSystem(conf);
+ String targetFsScheme = fileSys.getUri().getScheme();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme);
+ }
+ if (fileSys.exists(backupPath)) {
+ isExist = true;
+ }
+ return isExist;
+ }
+
+ /**
+ * Check target path first, confirm it doesn't exist before backup
+ * @param backupRootPath backup destination path
+ * @param conf configuration
+ * @throws IOException exception
+ */
+ public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException {
+ boolean targetExists = false;
+ try {
+ targetExists = checkPathExist(backupRootPath, conf);
+ } catch (IOException e) {
+ String expMsg = e.getMessage();
+ String newMsg = null;
+ if (expMsg.contains("No FileSystem for scheme")) {
+ newMsg =
+ "Unsupported filesystem scheme found in the backup target url. Error Message: "
+ + newMsg;
+ LOG.error(newMsg);
+ throw new IOException(newMsg);
+ } else {
+ throw e;
+ }
+ }
+
+ if (targetExists) {
+ LOG.info("Using existing backup root dir: " + backupRootPath);
+ } else {
+ LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created.");
+ }
+ }
+
+ /**
+ * Get the min value for all the Values a map.
+ * @param map map
+ * @return the min value
+ */
+ public static <T> Long getMinValue(HashMap<T, Long> map) {
+ Long minTimestamp = null;
+ if (map != null) {
+ ArrayList<Long> timestampList = new ArrayList<Long>(map.values());
+ Collections.sort(timestampList);
+ // The min among all the RS log timestamps will be kept in backup system table table.
+ minTimestamp = timestampList.get(0);
+ }
+ return minTimestamp;
+ }
+
+ /**
+ * Parses host name:port from archived WAL path
+ * @param p path
+ * @return host name
+ * @throws IOException exception
+ */
+ public static String parseHostFromOldLog(Path p) {
+ try {
+ String n = p.getName();
+ int idx = n.lastIndexOf(LOGNAME_SEPARATOR);
+ String s = URLDecoder.decode(n.substring(0, idx), "UTF8");
+ return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s);
+ } catch (Exception e) {
+ LOG.warn("Skip log file (can't parse): " + p);
+ return null;
+ }
+ }
+
+ /**
+ * Given the log file, parse the timestamp from the file name. The timestamp is the last number.
+ * @param p a path to the log file
+ * @return the timestamp
+ * @throws IOException exception
+ */
+ public static Long getCreationTime(Path p) throws IOException {
+ int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR);
+ if (idx < 0) {
+ throw new IOException("Cannot parse timestamp from path " + p);
+ }
+ String ts = p.getName().substring(idx + 1);
+ return Long.parseLong(ts);
+ }
+
+ public static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files,
+ PathFilter filter) throws FileNotFoundException, IOException {
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true);
+
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (lfs.isDirectory()) {
+ continue;
+ }
+ // apply filter
+ if (filter.accept(lfs.getPath())) {
+ files.add(lfs.getPath().toString());
+ }
+ }
+ return files;
+ }
+
+ public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException {
+ cleanupHLogDir(context, conf);
+ cleanupTargetDir(context, conf);
+ }
+
+ /**
+ * Clean up directories which are generated when DistCp copying hlogs
+ * @param backupInfo backup info
+ * @param conf configuration
+ * @throws IOException exception
+ */
+ private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException {
+
+ String logDir = backupInfo.getHLogTargetDir();
+ if (logDir == null) {
+ LOG.warn("No log directory specified for " + backupInfo.getBackupId());
+ return;
+ }
+
+ Path rootPath = new Path(logDir).getParent();
+ FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+ FileStatus[] files = listStatus(fs, rootPath, null);
+ if (files == null) {
+ return;
+ }
+ for (FileStatus file : files) {
+ LOG.debug("Delete log files: " + file.getPath().getName());
+ fs.delete(file.getPath(), true);
+ }
+ }
+
+ private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
+ try {
+ // clean up the data at target directory
+ LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId());
+ String targetDir = backupInfo.getBackupRootDir();
+ if (targetDir == null) {
+ LOG.warn("No target directory specified for " + backupInfo.getBackupId());
+ return;
+ }
+
+ FileSystem outputFs = FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
+
+ for (TableName table : backupInfo.getTables()) {
+ Path targetDirPath =
+ new Path(getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(),
+ table));
+ if (outputFs.delete(targetDirPath, true)) {
+ LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done.");
+ } else {
+ LOG.info("No data has been found in " + targetDirPath.toString() + ".");
+ }
+
+ Path tableDir = targetDirPath.getParent();
+ FileStatus[] backups = listStatus(outputFs, tableDir, null);
+ if (backups == null || backups.length == 0) {
+ outputFs.delete(tableDir, true);
+ LOG.debug(tableDir.toString() + " is empty, remove it.");
+ }
+ }
+ outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true);
+ } catch (IOException e1) {
+ LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at "
+ + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
+ }
+ }
+
+ /**
+ * Given the backup root dir, backup id and the table name, return the backup image location,
+ * which is also where the backup manifest file is. return value look like:
+ * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/"
+ * @param backupRootDir backup root directory
+ * @param backupId backup id
+ * @param tableName table name
+ * @return backupPath String for the particular table
+ */
+ public static String
+ getTableBackupDir(String backupRootDir, String backupId, TableName tableName) {
+ return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+ + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString()
+ + Path.SEPARATOR;
+ }
+
+ /**
+ * Sort history list by start time in descending order.
+ * @param historyList history list
+ * @return sorted list of BackupCompleteData
+ */
+ public static ArrayList<BackupInfo> sortHistoryListDesc(ArrayList<BackupInfo> historyList) {
+ ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+ TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>();
+ for (BackupInfo h : historyList) {
+ map.put(Long.toString(h.getStartTs()), h);
+ }
+ Iterator<String> i = map.descendingKeySet().iterator();
+ while (i.hasNext()) {
+ list.add(map.get(i.next()));
+ }
+ return list;
+ }
+
+ /**
+ * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates
+ * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and
+ * return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException.
+ * @param fs file system
+ * @param dir directory
+ * @param filter path filter
+ * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+ */
+ public static FileStatus[]
+ listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException {
+ FileStatus[] status = null;
+ try {
+ status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
+ } catch (FileNotFoundException fnfe) {
+ // if directory doesn't exist, return null
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(dir + " doesn't exist");
+ }
+ }
+ if (status == null || status.length < 1) return null;
+ return status;
+ }
+
+ /**
+ * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the
+ * 'path' component of a Path's URI: e.g. If a Path is
+ * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns
+ * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print
+ * out a Path without qualifying Filesystem instance.
+ * @param p file system Path whose 'path' component we are to return.
+ * @return Path portion of the Filesystem
+ */
+ public static String getPath(Path p) {
+ return p.toUri().getPath();
+ }
+
+ /**
+ * Given the backup root dir and the backup id, return the log file location for an incremental
+ * backup.
+ * @param backupRootDir backup root directory
+ * @param backupId backup id
+ * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
+ */
+ public static String getLogBackupDir(String backupRootDir, String backupId) {
+ return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+ + HConstants.HREGION_LOGDIR_NAME;
+ }
+
+ private static List<BackupInfo> getHistory(Configuration conf, Path backupRootPath)
+ throws IOException {
+ // Get all (n) history from backup root destination
+ FileSystem fs = FileSystem.get(conf);
+ RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath);
+
+ List<BackupInfo> infos = new ArrayList<BackupInfo>();
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (!lfs.isDirectory()) continue;
+ String backupId = lfs.getPath().getName();
+ try {
+ BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs);
+ infos.add(info);
+ } catch (IOException e) {
+ LOG.error("Can not load backup info from: " + lfs.getPath(), e);
+ }
+ }
+ // Sort
+ Collections.sort(infos, new Comparator<BackupInfo>() {
+
+ @Override
+ public int compare(BackupInfo o1, BackupInfo o2) {
+ long ts1 = getTimestamp(o1.getBackupId());
+ long ts2 = getTimestamp(o2.getBackupId());
+ if (ts1 == ts2) return 0;
+ return ts1 < ts2 ? 1 : -1;
+ }
+
+ private long getTimestamp(String backupId) {
+ String[] split = backupId.split("_");
+ return Long.parseLong(split[1]);
+ }
+ });
+ return infos;
+ }
+
+ public static List<BackupInfo> getHistory(Configuration conf, int n, Path backupRootPath,
+ BackupInfo.Filter... filters) throws IOException {
+ List<BackupInfo> infos = getHistory(conf, backupRootPath);
+ List<BackupInfo> ret = new ArrayList<BackupInfo>();
+ for (BackupInfo info : infos) {
+ if (ret.size() == n) {
+ break;
+ }
+ boolean passed = true;
+ for (int i = 0; i < filters.length; i++) {
+ if (!filters[i].apply(info)) {
+ passed = false;
+ break;
+ }
+ }
+ if (passed) {
+ ret.add(info);
+ }
+ }
+ return ret;
+ }
+
+ public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs)
+ throws IOException {
+ Path backupPath = new Path(backupRootPath, backupId);
+
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true);
+ while (it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) {
+ // Load BackupManifest
+ BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent());
+ BackupInfo info = manifest.toBackupInfo();
+ return info;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create restore request.
+ * @param backupRootDir backup root dir
+ * @param backupId backup id
+ * @param check check only
+ * @param fromTables table list from
+ * @param toTables table list to
+ * @param isOverwrite overwrite data
+ * @return request obkect
+ */
+ public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId,
+ boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
+ RestoreRequest.Builder builder = new RestoreRequest.Builder();
+ RestoreRequest request =
+ builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+ .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
+ return request;
+ }
+
+ public static boolean validate(HashMap<TableName, BackupManifest> backupManifestMap,
+ Configuration conf) throws IOException {
+ boolean isValid = true;
+
+ for (Entry<TableName, BackupManifest> manifestEntry : backupManifestMap.entrySet()) {
+ TableName table = manifestEntry.getKey();
+ TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>();
+
+ ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table);
+ if (depList != null && !depList.isEmpty()) {
+ imageSet.addAll(depList);
+ }
+
+ LOG.info("Dependent image(s) from old to new:");
+ for (BackupImage image : imageSet) {
+ String imageDir =
+ HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table);
+ if (!BackupUtils.checkPathExist(imageDir, conf)) {
+ LOG.error("ERROR: backup image does not exist: " + imageDir);
+ isValid = false;
+ break;
+ }
+ LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available");
+ }
+ }
+ return isValid;
+ }
+
+ public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String tmp =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path path =
+ new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ + EnvironmentEdgeManager.currentTime());
+ if (deleteOnExit) {
+ fs.deleteOnExit(path);
+ }
+ return path;
+ }
+
+ public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
+ return getBulkOutputDir(tableName, conf, true);
+ }
+
+ public static String getFileNameCompatibleString(TableName table) {
+ return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+ }
+
+ public static boolean failed(int result) {
+ return result != 0;
+ }
+
+ public static boolean succeeded(int result) {
+ return result == 0;
+ }
+
+ public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+ // set configuration for restore:
+ // LoadIncrementalHFile needs more time
+ // <name>hbase.rpc.timeout</name> <value>600000</value>
+ // calculates
+ Configuration conf = new Configuration(config);
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR);
+
+ // By default, it is 32 and loader will fail if # of files in any region exceed this
+ // limit. Bad for snapshot restore.
+ conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+ LoadIncrementalHFiles loader = null;
+ try {
+ loader = new LoadIncrementalHFiles(conf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return loader;
+ }
+}