You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/10 23:37:51 UTC
[05/10] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2
(Vladimir Rodionov)
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
new file mode 100644
index 0000000..0f1453e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * After a full backup was created, the incremental backup will only store the changes made after
+ * the last full or incremental backup. Creating the backup copies the logfiles in .logs and
+ * .oldlogs since the last backup timestamp.
+ */
+@InterfaceAudience.Private
+public class IncrementalBackupManager extends BackupManager {
+ public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
+
+ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOException {
+ super(conn, conf);
+ }
+
+ /**
+ * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
+ * in BackupInfo.
+ * @param conn the Connection
+ * @param backupInfo backup info
+ * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+ * @throws IOException exception
+ */
+ public HashMap<String, Long> getIncrBackupLogFileList(Connection conn, BackupInfo backupInfo)
+ throws IOException {
+ List<String> logList;
+ HashMap<String, Long> newTimestamps;
+ HashMap<String, Long> previousTimestampMins;
+
+ String savedStartCode = readBackupStartCode();
+
+ // key: tableName
+ // value: <RegionServer,PreviousTimeStamp>
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
+
+ previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
+ }
+ // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+ if (savedStartCode == null || previousTimestampMins == null
+ || previousTimestampMins.isEmpty()) {
+ throw new IOException(
+ "Cannot read any previous back up timestamps from backup system table. "
+ + "In order to create an incremental backup, at least one full backup is needed.");
+ }
+
+ LOG.info("Execute roll log procedure for incremental backup ...");
+ HashMap<String, String> props = new HashMap<String, String>();
+ props.put("backupRoot", backupInfo.getBackupRootDir());
+
+ try (Admin admin = conn.getAdmin();) {
+
+ admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+ LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+ }
+ newTimestamps = readRegionServerLastLogRollResult();
+
+ logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+ List<WALItem> logFromSystemTable =
+ getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
+ .getBackupRootDir());
+ addLogsFromBackupSystemToContext(logFromSystemTable);
+
+ logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+ backupInfo.setIncrBackupFileList(logList);
+
+ return newTimestamps;
+ }
+
+ private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
+ List<WALItem> logFromSystemTable) {
+
+ List<String> backupedWALList = toWALList(logFromSystemTable);
+ logList.removeAll(backupedWALList);
+ return logList;
+ }
+
+ private List<String> toWALList(List<WALItem> logFromSystemTable) {
+
+ List<String> list = new ArrayList<String>(logFromSystemTable.size());
+ for (WALItem item : logFromSystemTable) {
+ list.add(item.getWalFile());
+ }
+ return list;
+ }
+
+ private void addLogsFromBackupSystemToContext(List<WALItem> logFromSystemTable) {
+ List<String> walFiles = new ArrayList<String>();
+ for (WALItem item : logFromSystemTable) {
+ Path p = new Path(item.getWalFile());
+ String walFileName = p.getName();
+ String backupId = item.getBackupId();
+ String relWALPath = backupId + Path.SEPARATOR + walFileName;
+ walFiles.add(relWALPath);
+ }
+ }
+
+ /**
+ * For each region server: get all log files newer than the last timestamps, but not newer than
+ * the newest timestamps.
+ * @param olderTimestamps timestamp map for each region server of the last backup.
+ * @param newestTimestamps timestamp map for each region server that the backup should lead to.
+ * @return list of log files which needs to be added to this backup
+ * @throws IOException
+ */
+ private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
+ HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
+ List<WALItem> logFiles = new ArrayList<WALItem>();
+ Iterator<WALItem> it = getWALFilesFromBackupSystem();
+ while (it.hasNext()) {
+ WALItem item = it.next();
+ String rootDir = item.getBackupRoot();
+ if (!rootDir.equals(backupRoot)) {
+ continue;
+ }
+ String walFileName = item.getWalFile();
+ String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
+ if (server == null) {
+ continue;
+ }
+ Long tss = getTimestamp(walFileName);
+ Long oldTss = olderTimestamps.get(server);
+ Long newTss = newestTimestamps.get(server);
+ if (oldTss == null) {
+ logFiles.add(item);
+ continue;
+ }
+ if (newTss == null) {
+ newTss = Long.MAX_VALUE;
+ }
+ if (tss > oldTss && tss < newTss) {
+ logFiles.add(item);
+ }
+ }
+ return logFiles;
+ }
+
+ private Long getTimestamp(String walFileName) {
+ int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
+ return Long.parseLong(walFileName.substring(index + 1));
+ }
+
+ /**
+ * For each region server: get all log files newer than the last timestamps but not newer than the
+ * newest timestamps.
+ * @param olderTimestamps the timestamp for each region server of the last backup.
+ * @param newestTimestamps the timestamp for each region server that the backup should lead to.
+ * @param conf the Hadoop and Hbase configuration
+ * @param savedStartCode the startcode (timestamp) of last successful backup.
+ * @return a list of log files to be backed up
+ * @throws IOException exception
+ */
+ private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
+ HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
+ throws IOException {
+ LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ + "\n newestTimestamps: " + newestTimestamps);
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ FileSystem fs = rootdir.getFileSystem(conf);
+ NewestLogFilter pathFilter = new NewestLogFilter();
+
+ List<String> resultLogFiles = new ArrayList<String>();
+ List<String> newestLogs = new ArrayList<String>();
+
+ /*
+ * The old region servers and timestamps info we kept in backup system table may be out of sync
+ * if new region server is added or existing one lost. We'll deal with it here when processing
+ * the logs. If data in backup system table has more hosts, just ignore it. If the .logs
+ * directory includes more hosts, the additional hosts will not have old timestamps to compare
+ * with. We'll just use all the logs in that directory. We always write up-to-date region server
+ * and timestamp info to backup system table at the end of successful backup.
+ */
+
+ FileStatus[] rss;
+ Path p;
+ String host;
+ Long oldTimeStamp;
+ String currentLogFile;
+ long currentLogTS;
+
+ // Get the files in .logs.
+ rss = fs.listStatus(logDir);
+ for (FileStatus rs : rss) {
+ p = rs.getPath();
+ host = BackupUtils.parseHostNameFromLogFile(p);
+ if (host == null) {
+ continue;
+ }
+ FileStatus[] logs;
+ oldTimeStamp = olderTimestamps.get(host);
+ // It is possible that there is no old timestamp in backup system table for this host if
+ // this region server is newly added after our last backup.
+ if (oldTimeStamp == null) {
+ logs = fs.listStatus(p);
+ } else {
+ pathFilter.setLastBackupTS(oldTimeStamp);
+ logs = fs.listStatus(p, pathFilter);
+ }
+ for (FileStatus log : logs) {
+ LOG.debug("currentLogFile: " + log.getPath().toString());
+ if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
+ }
+ continue;
+ }
+ currentLogFile = log.getPath().toString();
+ resultLogFiles.add(currentLogFile);
+ currentLogTS = BackupUtils.getCreationTime(log.getPath());
+ // newestTimestamps is up-to-date with the current list of hosts
+ // so newestTimestamps.get(host) will not be null.
+ if (currentLogTS > newestTimestamps.get(host)) {
+ newestLogs.add(currentLogFile);
+ }
+ }
+ }
+
+ // Include the .oldlogs files too.
+ FileStatus[] oldlogs = fs.listStatus(oldLogDir);
+ for (FileStatus oldlog : oldlogs) {
+ p = oldlog.getPath();
+ currentLogFile = p.toString();
+ if (AbstractFSWALProvider.isMetaFile(p)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip .meta log file: " + currentLogFile);
+ }
+ continue;
+ }
+ host = BackupUtils.parseHostFromOldLog(p);
+ if (host == null) {
+ continue;
+ }
+ currentLogTS = BackupUtils.getCreationTime(p);
+ oldTimeStamp = olderTimestamps.get(host);
+ /*
+ * It is possible that there is no old timestamp in backup system table for this host. At the
+ * time of our last backup operation, this rs did not exist. The reason can be one of the two:
+ * 1. The rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after
+ * our last backup.
+ */
+ if (oldTimeStamp == null) {
+ if (currentLogTS < Long.parseLong(savedStartCode)) {
+ // This log file is really old, its region server was before our last backup.
+ continue;
+ } else {
+ resultLogFiles.add(currentLogFile);
+ }
+ } else if (currentLogTS > oldTimeStamp) {
+ resultLogFiles.add(currentLogFile);
+ }
+
+ // It is possible that a host in .oldlogs is an obsolete region server
+ // so newestTimestamps.get(host) here can be null.
+ // Even if these logs belong to a obsolete region server, we still need
+ // to include they to avoid loss of edits for backup.
+ Long newTimestamp = newestTimestamps.get(host);
+ if (newTimestamp != null && currentLogTS > newTimestamp) {
+ newestLogs.add(currentLogFile);
+ }
+ }
+ // remove newest log per host because they are still in use
+ resultLogFiles.removeAll(newestLogs);
+ return resultLogFiles;
+ }
+
+ static class NewestLogFilter implements PathFilter {
+ private Long lastBackupTS = 0L;
+
+ public NewestLogFilter() {
+ }
+
+ protected void setLastBackupTS(Long ts) {
+ this.lastBackupTS = ts;
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ // skip meta table log -- ts.meta file
+ if (AbstractFSWALProvider.isMetaFile(path)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip .meta log file: " + path.getName());
+ }
+ return false;
+ }
+ long timestamp;
+ try {
+ timestamp = BackupUtils.getCreationTime(path);
+ return timestamp > lastBackupTS;
+ } catch (Exception e) {
+ LOG.warn("Cannot read timestamp of log file " + path);
+ return false;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
new file mode 100644
index 0000000..395ed6d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * Incremental backup implementation.
+ * See the {@link #execute() execute} method.
+ *
+ */
+@InterfaceAudience.Private
+public class IncrementalTableBackupClient extends TableBackupClient {
+ private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class);
+
+ public IncrementalTableBackupClient(final Connection conn, final String backupId,
+ BackupRequest request) throws IOException {
+ super(conn, backupId, request);
+ }
+
+ private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ List<String> list = new ArrayList<String>();
+ for (String file : incrBackupFileList) {
+ if (fs.exists(new Path(file))) {
+ list.add(file);
+ } else {
+ LOG.warn("Can't find file: " + file);
+ }
+ }
+ return list;
+ }
+
+ private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ List<String> list = new ArrayList<String>();
+ for (String file : incrBackupFileList) {
+ if (!fs.exists(new Path(file))) {
+ list.add(file);
+ }
+ }
+ return list;
+
+ }
+
+ /**
+ * Do incremental copy.
+ * @param backupInfo backup info
+ */
+ private void incrementalCopy(BackupInfo backupInfo) throws Exception {
+
+ LOG.info("Incremental copy is starting.");
+ // set overall backup phase: incremental_copy
+ backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
+ // get incremental backup file list and prepare parms for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+ strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
+
+ BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
+ int counter = 0;
+ int MAX_ITERAIONS = 2;
+ while (counter++ < MAX_ITERAIONS) {
+ // We run DistCp maximum 2 times
+ // If it fails on a second time, we throw Exception
+ int res =
+ copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
+
+ if (res != 0) {
+ LOG.error("Copy incremental log files failed with return code: " + res + ".");
+ throw new IOException("Failed of Hadoop Distributed Copy from "
+ + StringUtils.join(incrBackupFileList, ",") + " to "
+ + backupInfo.getHLogTargetDir());
+ }
+ List<String> missingFiles = getMissingFiles(incrBackupFileList);
+
+ if (missingFiles.isEmpty()) {
+ break;
+ } else {
+ // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run
+ // update backupInfo and strAttr
+ if (counter == MAX_ITERAIONS) {
+ String msg =
+ "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ",");
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
+ incrBackupFileList.removeAll(missingFiles);
+ incrBackupFileList.addAll(converted);
+ backupInfo.setIncrBackupFileList(incrBackupFileList);
+
+ // Run DistCp only for missing files (which have been moved from WALs to oldWALs
+ // during previous run)
+ strArr = converted.toArray(new String[converted.size() + 1]);
+ strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
+ }
+ }
+
+ LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to "
+ + backupInfo.getHLogTargetDir() + " finished.");
+ }
+
+ private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException {
+ List<String> list = new ArrayList<String>();
+ for (String path : missingFiles) {
+ if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
+ LOG.error("Copy incremental log files failed, file is missing : " + path);
+ throw new IOException("Failed of Hadoop Distributed Copy to "
+ + backupInfo.getHLogTargetDir() + ", file is missing " + path);
+ }
+ list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR
+ + HConstants.HREGION_OLDLOGDIR_NAME));
+ }
+ return list;
+ }
+
+ @Override
+ public void execute() throws IOException {
+
+ // case PREPARE_INCREMENTAL:
+ beginBackup(backupManager, backupInfo);
+ backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
+ LOG.debug("For incremental backup, current table set is "
+ + backupManager.getIncrementalBackupTableSet());
+ try {
+ newTimestamps =
+ ((IncrementalBackupManager) backupManager).getIncrBackupLogFileList(conn, backupInfo);
+ } catch (Exception e) {
+ // fail the overall backup and return
+ failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
+ BackupType.INCREMENTAL, conf);
+ }
+
+ // case INCREMENTAL_COPY:
+ try {
+ // copy out the table and region info files for each table
+ BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
+ incrementalCopy(backupInfo);
+ // Save list of WAL files copied
+ backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
+ } catch (Exception e) {
+ String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
+ // fail the overall backup and return
+ failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
+ }
+ // case INCR_BACKUP_COMPLETE:
+ // set overall backup status: complete. Here we make sure to complete the backup.
+ // After this checkpoint, even if entering cancel process, will let the backup finished
+ try {
+ backupInfo.setState(BackupState.COMPLETE);
+ // Set the previousTimestampMap which is before this current log roll to the manifest.
+ HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
+ backupManager.readLogTimestampMap();
+ backupInfo.setIncrTimestampMap(previousTimestampMap);
+
+ // The table list in backupInfo is good for both full backup and incremental backup.
+ // For incremental backup, it contains the incremental backup table set.
+ backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
+
+ HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+ backupManager.readLogTimestampMap();
+
+ Long newStartCode =
+ BackupUtils.getMinValue(BackupUtils
+ .getRSLogTimestampMins(newTableSetTimestampMap));
+ backupManager.writeBackupStartCode(newStartCode);
+ // backup complete
+ completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
+
+ } catch (IOException e) {
+ failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
+ BackupType.INCREMENTAL, conf);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
new file mode 100644
index 0000000..f418305
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.util.RestoreTool;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+
+/**
+ * Restore table implementation
+ *
+ */
+@InterfaceAudience.Private
+public class RestoreTablesClient {
+ private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class);
+
+ private Configuration conf;
+ private Connection conn;
+ private String backupId;
+ private TableName[] sTableArray;
+ private TableName[] tTableArray;
+ private String targetRootDir;
+ private boolean isOverwrite;
+
+ public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
+ this.targetRootDir = request.getBackupRootDir();
+ this.backupId = request.getBackupId();
+ this.sTableArray = request.getFromTables();
+ this.tTableArray = request.getToTables();
+ if (tTableArray == null || tTableArray.length == 0) {
+ this.tTableArray = sTableArray;
+ }
+ this.isOverwrite = request.isOverwrite();
+ this.conn = conn;
+ this.conf = conn.getConfiguration();
+
+ }
+
+ /**
+ * Validate target tables
+ * @param conn connection
+ * @param mgr table state manager
+ * @param tTableArray: target tables
+ * @param isOverwrite overwrite existing table
+ * @throws IOException exception
+ */
+ private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite) throws IOException {
+ ArrayList<TableName> existTableList = new ArrayList<>();
+ ArrayList<TableName> disabledTableList = new ArrayList<>();
+
+ // check if the tables already exist
+ try (Admin admin = conn.getAdmin();) {
+ for (TableName tableName : tTableArray) {
+ if (admin.tableExists(tableName)) {
+ existTableList.add(tableName);
+ if (admin.isTableDisabled(tableName)) {
+ disabledTableList.add(tableName);
+ }
+ } else {
+ LOG.info("HBase table " + tableName
+ + " does not exist. It will be created during restore process");
+ }
+ }
+ }
+
+ if (existTableList.size() > 0) {
+ if (!isOverwrite) {
+ LOG.error("Existing table ("
+ + existTableList
+ + ") found in the restore target, please add "
+ + "\"-overwrite\" option in the command if you mean"
+ + " to restore to these existing tables");
+ throw new IOException("Existing table found in target while no \"-overwrite\" "
+ + "option found");
+ } else {
+ if (disabledTableList.size() > 0) {
+ LOG.error("Found offline table in the restore target, "
+ + "please enable them before restore with \"-overwrite\" option");
+ LOG.info("Offline table list in restore target: " + disabledTableList);
+ throw new IOException(
+ "Found offline table in the target when restore with \"-overwrite\" option");
+ }
+ }
+ }
+ }
+
+ /**
+ * Restore operation handle each backupImage in array
+ * @param svc: master services
+ * @param images: array BackupImage
+ * @param sTable: table to be restored
+ * @param tTable: table to be restored to
+ * @param truncateIfExists: truncate table
+ * @throws IOException exception
+ */
+
+ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable,
+ boolean truncateIfExists) throws IOException {
+
+ // First image MUST be image of a FULL backup
+ BackupImage image = images[0];
+ String rootDir = image.getRootDir();
+ String backupId = image.getBackupId();
+ Path backupRoot = new Path(rootDir);
+ RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
+ Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
+ String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
+ // We need hFS only for full restore (see the code)
+ BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+ if (manifest.getType() == BackupType.FULL) {
+ LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+ + tableBackupPath.toString());
+ restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
+ lastIncrBackupId);
+ } else { // incremental Backup
+ throw new IOException("Unexpected backup type " + image.getType());
+ }
+
+ if (images.length == 1) {
+ // full backup restore done
+ return;
+ }
+
+ List<Path> dirList = new ArrayList<Path>();
+ // add full backup path
+ // full backup path comes first
+ for (int i = 1; i < images.length; i++) {
+ BackupImage im = images[i];
+ String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+ dirList.add(new Path(logBackupDir));
+ }
+
+ String dirs = StringUtils.join(dirList, ",");
+ LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
+ Path[] paths = new Path[dirList.size()];
+ dirList.toArray(paths);
+ restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
+ new TableName[] { tTable }, lastIncrBackupId);
+ LOG.info(sTable + " has been successfully restored to " + tTable);
+
+ }
+
+ /**
+ * Restore operation. Stage 2: resolved Backup Image dependency
+ * @param backupManifestMap : tableName, Manifest
+ * @param sTableArray The array of tables to be restored
+ * @param tTableArray The array of mapping tables to restore to
+ * @return set of BackupImages restored
+ * @throws IOException exception
+ */
+ private void restore(HashMap<TableName, BackupManifest> backupManifestMap,
+ TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
+ TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+ boolean truncateIfExists = isOverwrite;
+ try {
+ for (int i = 0; i < sTableArray.length; i++) {
+ TableName table = sTableArray[i];
+ BackupManifest manifest = backupManifestMap.get(table);
+ // Get the image list of this backup for restore in time order from old
+ // to new.
+ List<BackupImage> list = new ArrayList<BackupImage>();
+ list.add(manifest.getBackupImage());
+ TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+ List<BackupImage> depList = manifest.getDependentListByTable(table);
+ set.addAll(depList);
+ BackupImage[] arr = new BackupImage[set.size()];
+ set.toArray(arr);
+ restoreImages(arr, table, tTableArray[i], truncateIfExists);
+ restoreImageSet.addAll(list);
+ if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+ LOG.info("Restore includes the following image(s):");
+ for (BackupImage image : restoreImageSet) {
+ LOG.info("Backup: "
+ + image.getBackupId()
+ + " "
+ + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+ table));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed", e);
+ throw new IOException(e);
+ }
+ LOG.debug("restoreStage finished");
+ }
+
+ public void execute() throws IOException {
+
+ // case VALIDATION:
+ // check the target tables
+ checkTargetTables(tTableArray, isOverwrite);
+ // case RESTORE_IMAGES:
+ HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+ // check and load backup image manifest for the tables
+ Path rootPath = new Path(targetRootDir);
+ HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
+ backupId);
+ restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
new file mode 100644
index 0000000..42a8076
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Base class for backup operation. Concrete implementation for
+ * full and incremental backup are delegated to corresponding sub-classes:
+ * {@link FullTableBackupClient} and {@link IncrementalTableBackupClient}
+ *
+ */
+@InterfaceAudience.Private
+public abstract class TableBackupClient {
+ private static final Log LOG = LogFactory.getLog(TableBackupClient.class);
+
+ protected Configuration conf;
+ protected Connection conn;
+ protected String backupId;
+ protected List<TableName> tableList;
+ protected HashMap<String, Long> newTimestamps = null;
+
+ protected BackupManager backupManager;
+ protected BackupInfo backupInfo;
+
+ public TableBackupClient(final Connection conn, final String backupId, BackupRequest request)
+ throws IOException {
+ if (request.getBackupType() == BackupType.FULL) {
+ backupManager = new BackupManager(conn, conn.getConfiguration());
+ } else {
+ backupManager = new IncrementalBackupManager(conn, conn.getConfiguration());
+ }
+ this.backupId = backupId;
+ this.tableList = request.getTableList();
+ this.conn = conn;
+ this.conf = conn.getConfiguration();
+ backupInfo =
+ backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
+ request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
+ if (tableList == null || tableList.isEmpty()) {
+ this.tableList = new ArrayList<>(backupInfo.getTables());
+ }
+ }
+
+ /**
+ * Begin the overall backup.
+ * @param backupInfo backup info
+ * @throws IOException exception
+ */
+ protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
+ throws IOException {
+ backupManager.setBackupInfo(backupInfo);
+ // set the start timestamp of the overall backup
+ long startTs = EnvironmentEdgeManager.currentTime();
+ backupInfo.setStartTs(startTs);
+ // set overall backup status: ongoing
+ backupInfo.setState(BackupState.RUNNING);
+ backupInfo.setPhase(BackupPhase.REQUEST);
+ LOG.info("Backup " + backupInfo.getBackupId() + " started at " + startTs + ".");
+
+ backupManager.updateBackupInfo(backupInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Backup session " + backupInfo.getBackupId() + " has been started.");
+ }
+ }
+
+ private String getMessage(Exception e) {
+ String msg = e.getMessage();
+ if (msg == null || msg.equals("")) {
+ msg = e.getClass().getName();
+ }
+ return msg;
+ }
+
+ /**
+ * Delete HBase snapshot for backup.
+ * @param backupInfo backup info
+ * @throws Exception exception
+ */
+ private void deleteSnapshot(final Connection conn, BackupInfo backupInfo, Configuration conf)
+ throws IOException {
+ LOG.debug("Trying to delete snapshot for full backup.");
+ for (String snapshotName : backupInfo.getSnapshotNames()) {
+ if (snapshotName == null) {
+ continue;
+ }
+ LOG.debug("Trying to delete snapshot: " + snapshotName);
+
+ try (Admin admin = conn.getAdmin();) {
+ admin.deleteSnapshot(snapshotName);
+ } catch (IOException ioe) {
+ LOG.debug("when deleting snapshot " + snapshotName, ioe);
+ }
+ LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId()
+ + " succeeded.");
+ }
+ }
+
+ /**
+ * Clean up directories with prefix "exportSnapshot-", which are generated when exporting
+ * snapshots.
+ * @throws IOException exception
+ */
+ private void cleanupExportSnapshotLog(Configuration conf) throws IOException {
+ FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+ Path stagingDir =
+ new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory()
+ .toString()));
+ FileStatus[] files = FSUtils.listStatus(fs, stagingDir);
+ if (files == null) {
+ return;
+ }
+ for (FileStatus file : files) {
+ if (file.getPath().getName().startsWith("exportSnapshot-")) {
+ LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName());
+ if (FSUtils.delete(fs, file.getPath(), true) == false) {
+ LOG.warn("Can not delete " + file.getPath());
+ }
+ }
+ }
+ }
+
+ /**
+ * Clean up the uncompleted data at target directory if the ongoing backup has already entered
+ * the copy phase.
+ */
+ private void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
+ try {
+ // clean up the uncompleted data at target directory if the ongoing backup has already entered
+ // the copy phase
+ LOG.debug("Trying to cleanup up target dir. Current backup phase: "
+ + backupInfo.getPhase());
+ if (backupInfo.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
+ || backupInfo.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
+ || backupInfo.getPhase().equals(BackupPhase.STORE_MANIFEST)) {
+ FileSystem outputFs =
+ FileSystem.get(new Path(backupInfo.getBackupRootDir()).toUri(), conf);
+
+ // now treat one backup as a transaction, clean up data that has been partially copied at
+ // table level
+ for (TableName table : backupInfo.getTables()) {
+ Path targetDirPath =
+ new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(),
+ backupInfo.getBackupId(), table));
+ if (outputFs.delete(targetDirPath, true)) {
+ LOG.info("Cleaning up uncompleted backup data at " + targetDirPath.toString()
+ + " done.");
+ } else {
+ LOG.info("No data has been copied to " + targetDirPath.toString() + ".");
+ }
+
+ Path tableDir = targetDirPath.getParent();
+ FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
+ if (backups == null || backups.length == 0) {
+ outputFs.delete(tableDir, true);
+ LOG.debug(tableDir.toString() + " is empty, remove it.");
+ }
+ }
+ }
+
+ } catch (IOException e1) {
+ LOG.error("Cleaning up uncompleted backup data of " + backupInfo.getBackupId() + " at "
+ + backupInfo.getBackupRootDir() + " failed due to " + e1.getMessage() + ".");
+ }
+ }
+
+ /**
+ * Fail the overall backup.
+ * @param backupInfo backup info
+ * @param e exception
+ * @throws Exception exception
+ */
+ protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager,
+ Exception e, String msg, BackupType type, Configuration conf) throws IOException {
+ LOG.error(msg + getMessage(e), e);
+ // If this is a cancel exception, then we've already cleaned.
+
+ // set the failure timestamp of the overall backup
+ backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
+
+ // set failure message
+ backupInfo.setFailedMsg(e.getMessage());
+
+ // set overall backup status: failed
+ backupInfo.setState(BackupState.FAILED);
+
+ // compose the backup failed data
+ String backupFailedData =
+ "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs()
+ + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase()
+ + ",failedmessage=" + backupInfo.getFailedMsg();
+ LOG.error(backupFailedData);
+
+ backupManager.updateBackupInfo(backupInfo);
+
+ // if full backup, then delete HBase snapshots if there already are snapshots taken
+ // and also clean up export snapshot log files if exist
+ if (type == BackupType.FULL) {
+ deleteSnapshot(conn, backupInfo, conf);
+ cleanupExportSnapshotLog(conf);
+ }
+
+ // clean up the uncompleted data at target directory if the ongoing backup has already entered
+ // the copy phase
+ // For incremental backup, DistCp logs will be cleaned with the targetDir.
+ cleanupTargetDir(backupInfo, conf);
+ LOG.info("Backup " + backupInfo.getBackupId() + " failed.");
+ }
+
+ /**
+ * Add manifest for the current backup. The manifest is stored within the table backup directory.
+ * @param backupInfo The current backup info
+ * @throws IOException exception
+ * @throws BackupException exception
+ */
+ private void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type,
+ Configuration conf) throws IOException, BackupException {
+ // set the overall backup phase : store manifest
+ backupInfo.setPhase(BackupPhase.STORE_MANIFEST);
+
+ BackupManifest manifest;
+
+ // Since we have each table's backup in its own directory structure,
+ // we'll store its manifest with the table directory.
+ for (TableName table : backupInfo.getTables()) {
+ manifest = new BackupManifest(backupInfo, table);
+ ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo, table);
+ for (BackupImage image : ancestors) {
+ manifest.addDependentImage(image);
+ }
+
+ if (type == BackupType.INCREMENTAL) {
+ // We'll store the log timestamps for this table only in its manifest.
+ HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+ new HashMap<TableName, HashMap<String, Long>>();
+ tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
+ manifest.setIncrTimestampMap(tableTimestampMap);
+ ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
+ for (BackupImage image : ancestorss) {
+ manifest.addDependentImage(image);
+ }
+ }
+ manifest.store(conf);
+ }
+
+ // For incremental backup, we store a overall manifest in
+ // <backup-root-dir>/WALs/<backup-id>
+ // This is used when created the next incremental backup
+ if (type == BackupType.INCREMENTAL) {
+ manifest = new BackupManifest(backupInfo);
+ // set the table region server start and end timestamps for incremental backup
+ manifest.setIncrTimestampMap(backupInfo.getIncrTimestampMap());
+ ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
+ for (BackupImage image : ancestors) {
+ manifest.addDependentImage(image);
+ }
+ manifest.store(conf);
+ }
+ }
+
+ /**
+ * Get backup request meta data dir as string.
+ * @param backupInfo backup info
+ * @return meta data dir
+ */
+ private String obtainBackupMetaDataStr(BackupInfo backupInfo) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("type=" + backupInfo.getType() + ",tablelist=");
+ for (TableName table : backupInfo.getTables()) {
+ sb.append(table + ";");
+ }
+ if (sb.lastIndexOf(";") > 0) {
+ sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
+ }
+ sb.append(",targetRootDir=" + backupInfo.getBackupRootDir());
+
+ return sb.toString();
+ }
+
+ /**
+ * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying
+ * hlogs.
+ * @throws IOException exception
+ */
+ private void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
+ Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
+ FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+ FileStatus[] files = FSUtils.listStatus(fs, rootPath);
+ if (files == null) {
+ return;
+ }
+ for (FileStatus file : files) {
+ if (file.getPath().getName().startsWith("_distcp_logs")) {
+ LOG.debug("Delete log files of DistCp: " + file.getPath().getName());
+ FSUtils.delete(fs, file.getPath(), true);
+ }
+ }
+ }
+
+ /**
+ * Complete the overall backup.
+ * @param backupInfo backup info
+ * @throws Exception exception
+ */
+ protected void completeBackup(final Connection conn, BackupInfo backupInfo,
+ BackupManager backupManager, BackupType type, Configuration conf) throws IOException {
+ // set the complete timestamp of the overall backup
+ backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
+ // set overall backup status: complete
+ backupInfo.setState(BackupState.COMPLETE);
+ backupInfo.setProgress(100);
+ // add and store the manifest for the backup
+ addManifest(backupInfo, backupManager, type, conf);
+
+ // after major steps done and manifest persisted, do convert if needed for incremental backup
+ /* in-fly convert code here, provided by future jira */
+ LOG.debug("in-fly convert code here, provided by future jira");
+
+ // compose the backup complete data
+ String backupCompleteData =
+ obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()
+ + ",completets=" + backupInfo.getCompleteTs() + ",bytescopied="
+ + backupInfo.getTotalBytesCopied();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Backup " + backupInfo.getBackupId() + " finished: " + backupCompleteData);
+ }
+ backupManager.updateBackupInfo(backupInfo);
+
+ // when full backup is done:
+ // - delete HBase snapshot
+ // - clean up directories with prefix "exportSnapshot-", which are generated when exporting
+ // snapshots
+ if (type == BackupType.FULL) {
+ deleteSnapshot(conn, backupInfo, conf);
+ cleanupExportSnapshotLog(conf);
+ } else if (type == BackupType.INCREMENTAL) {
+ cleanupDistCpLog(backupInfo, conf);
+ }
+ LOG.info("Backup " + backupInfo.getBackupId() + " completed.");
+ }
+
+ /**
+ * Backup request execution
+ * @throws IOException
+ */
+ public abstract void execute() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
new file mode 100644
index 0000000..5641720
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
+ * for later bulk importing.
+ */
+@InterfaceAudience.Private
+public class HFileSplitterJob extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class);
+ final static String NAME = "HFileSplitterJob";
+ public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+ public final static String TABLES_KEY = "hfile.input.tables";
+ public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ public HFileSplitterJob() {
+ }
+
+ protected HFileSplitterJob(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * A mapper that just writes out cells. This one can be used together with
+ * {@link KeyValueSortReducer}
+ */
+ static class HFileCellMapper extends
+ Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+ @Override
+ public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+ InterruptedException {
+ // Convert value to KeyValue if subclass
+ if (!value.getClass().equals(KeyValue.class)) {
+ value =
+ new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+ value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+ value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
+ value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
+ value.getValueOffset(), value.getValueLength());
+ }
+ context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ // do nothing
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Configuration conf = getConf();
+ String inputDirs = args[0];
+ String tabName = args[1];
+ conf.setStrings(TABLES_KEY, tabName);
+ Job job =
+ Job.getInstance(conf,
+ conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+ job.setJarByClass(HFileSplitterJob.class);
+ FileInputFormat.addInputPaths(job, inputDirs);
+ job.setInputFormatClass(HFileInputFormat.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ if (hfileOutPath != null) {
+ LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+ TableName tableName = TableName.valueOf(tabName);
+ job.setMapperClass(HFileCellMapper.class);
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputValueClass(KeyValue.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+ }
+ LOG.debug("success configuring load incremental job");
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ com.google.common.base.Preconditions.class);
+ } else {
+ throw new IOException("No bulk output directory specified");
+ }
+ return job;
+ }
+
+ /**
+ * Print usage
+ * @param errorMsg Error message. Can be null.
+ */
+ private void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+ System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+ System.err.println("<table> table to load.\n");
+ System.err.println("To generate HFiles for a bulk data load, pass the option:");
+ System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err.println("Other options:");
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the HFile splitter");
+ System.err.println("For performance also consider the following options:\n"
+ + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
+ }
+
+ /**
+ * Main entry point.
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ System.exit(-1);
+ }
+ Job job = createSubmittableJob(args);
+ int result = job.waitForCompletion(true) ? 0 : 1;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
new file mode 100644
index 0000000..016d1a4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyJob;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/**
+ * Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
+ * operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
+ * other is copying for incremental log files, which bases on extending DistCp's function.
+ */
+@InterfaceAudience.Private
+public class MapReduceBackupCopyJob implements BackupCopyJob {
+ private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class);
+
+ private Configuration conf;
+
+ // Accumulated progress within the whole backup process for the copy operation
+ private float progressDone = 0.1f;
+ private long bytesCopied = 0;
+ private static float INIT_PROGRESS = 0.1f;
+
+ // The percentage of the current copy task within the whole task if multiple time copies are
+ // needed. The default value is 100%, which means only 1 copy task for the whole.
+ private float subTaskPercntgInWholeTask = 1f;
+
+ public MapReduceBackupCopyJob() {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the current copy task percentage within the whole task if multiple copies are needed.
+ * @return the current copy task percentage
+ */
+ public float getSubTaskPercntgInWholeTask() {
+ return subTaskPercntgInWholeTask;
+ }
+
+ /**
+ * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+ * be called before calling
+ * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
+ * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+ */
+ public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+ this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+ }
+
+ static class SnapshotCopy extends ExportSnapshot {
+ private BackupInfo backupInfo;
+ private TableName table;
+
+ public SnapshotCopy(BackupInfo backupInfo, TableName table) {
+ super();
+ this.backupInfo = backupInfo;
+ this.table = table;
+ }
+
+ public TableName getTable() {
+ return this.table;
+ }
+
+ public BackupInfo getBackupInfo() {
+ return this.backupInfo;
+ }
+ }
+
+ /**
+ * Update the ongoing backup with new progress.
+ * @param backupInfo backup info
+ * @param newProgress progress
+ * @param bytesCopied bytes copied
+ * @throws NoNodeException exception
+ */
+ static void updateProgress(BackupInfo backupInfo, BackupManager backupManager,
+ int newProgress, long bytesCopied) throws IOException {
+ // compose the new backup progress data, using fake number for now
+ String backupProgressData = newProgress + "%";
+
+ backupInfo.setProgress(newProgress);
+ backupManager.updateBackupInfo(backupInfo);
+ LOG.debug("Backup progress data \"" + backupProgressData
+ + "\" has been updated to backup system table for " + backupInfo.getBackupId());
+ }
+
+ /**
+ * Extends DistCp for progress updating to backup system table
+ * during backup. Using DistCpV2 (MAPREDUCE-2765).
+ * Simply extend it and override execute() method to get the
+ * Job reference for progress updating.
+ * Only the argument "src1, [src2, [...]] dst" is supported,
+ * no more DistCp options.
+ */
+ class BackupDistCp extends DistCp {
+
+ private BackupInfo backupInfo;
+ private BackupManager backupManager;
+
+ public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
+ BackupManager backupManager) throws Exception {
+ super(conf, options);
+ this.backupInfo = backupInfo;
+ this.backupManager = backupManager;
+ }
+
+ @Override
+ public Job execute() throws Exception {
+
+ // reflection preparation for private methods and fields
+ Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+ Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+ Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+ Method methodCreateInputFileListing =
+ classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+ Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+ Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+ Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+ Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+ Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+ methodCreateMetaFolderPath.setAccessible(true);
+ methodCreateJob.setAccessible(true);
+ methodCreateInputFileListing.setAccessible(true);
+ methodCleanup.setAccessible(true);
+
+ fieldInputOptions.setAccessible(true);
+ fieldMetaFolder.setAccessible(true);
+ fieldJobFS.setAccessible(true);
+ fieldSubmitted.setAccessible(true);
+
+ // execute() logic starts here
+ assert fieldInputOptions.get(this) != null;
+
+ Job job = null;
+ try {
+ synchronized (this) {
+ // Don't cleanup while we are setting up.
+ fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+ fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
+ job = (Job) methodCreateJob.invoke(this);
+ }
+ methodCreateInputFileListing.invoke(this, job);
+
+ // Get the total length of the source files
+ List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+
+ long totalSrcLgth = 0;
+ for (Path aSrc : srcs) {
+ totalSrcLgth +=
+ BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
+ }
+
+ // submit the copy job
+ job.submit();
+ fieldSubmitted.set(this, true);
+
+ // after submit the MR job, set its handler in backup handler for cancel process
+ // this.backupHandler.copyJob = job;
+
+ // Update the copy progress to ZK every 0.5s if progress value changed
+ int progressReportFreq =
+ MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
+ 500);
+ float lastProgress = progressDone;
+ while (!job.isComplete()) {
+ float newProgress =
+ progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+ if (newProgress > lastProgress) {
+
+ BigDecimal progressData =
+ new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+ String newProgressStr = progressData + "%";
+ LOG.info("Progress: " + newProgressStr);
+ updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
+ LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+ + newProgressStr + ".\"");
+ lastProgress = newProgress;
+ }
+ Thread.sleep(progressReportFreq);
+ }
+ // update the progress data after copy job complete
+ float newProgress =
+ progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+ BigDecimal progressData =
+ new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+ String newProgressStr = progressData + "%";
+ LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
+ + " mapProgress: " + job.mapProgress());
+
+ // accumulate the overall backup progress
+ progressDone = newProgress;
+ bytesCopied += totalSrcLgth;
+
+ updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
+ LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+ + newProgressStr + " - " + bytesCopied + " bytes copied.\"");
+ } catch (Throwable t) {
+ LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t);
+ throw t;
+ } finally {
+ if (!fieldSubmitted.getBoolean(this)) {
+ methodCleanup.invoke(this);
+ }
+ }
+
+ String jobID = job.getJobID().toString();
+ job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+ LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " "
+ + job.isSuccessful());
+ Counters ctrs = job.getCounters();
+ LOG.debug(ctrs);
+ if (job.isComplete() && !job.isSuccessful()) {
+ throw new Exception("DistCp job-id: " + jobID + " failed");
+ }
+
+ return job;
+ }
+
+ }
+
+ /**
+ * Do backup copy based on different types.
+ * @param context The backup info
+ * @param conf The hadoop configuration
+ * @param copyType The backup copy type
+ * @param options Options for customized ExportSnapshot or DistCp
+ * @throws Exception exception
+ */
+ @Override
+ public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
+ BackupType copyType, String[] options) throws IOException {
+ int res = 0;
+
+ try {
+ if (copyType == BackupType.FULL) {
+ SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+ LOG.debug("Doing SNAPSHOT_COPY");
+ // Make a new instance of conf to be used by the snapshot copy class.
+ snapshotCp.setConf(new Configuration(conf));
+ res = snapshotCp.run(options);
+
+ } else if (copyType == BackupType.INCREMENTAL) {
+ LOG.debug("Doing COPY_TYPE_DISTCP");
+ setSubTaskPercntgInWholeTask(1f);
+
+ BackupDistCp distcp =
+ new BackupDistCp(new Configuration(conf), null, context, backupManager);
+ // Handle a special case where the source file is a single file.
+ // In this case, distcp will not create the target dir. It just take the
+ // target as a file name and copy source file to the target (as a file name).
+ // We need to create the target dir before run distcp.
+ LOG.debug("DistCp options: " + Arrays.toString(options));
+ Path dest = new Path(options[options.length - 1]);
+ FileSystem destfs = dest.getFileSystem(conf);
+ if (!destfs.exists(dest)) {
+ destfs.mkdirs(dest);
+ }
+ res = distcp.run(options);
+ }
+ return res;
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void cancel(String jobId) throws IOException {
+ JobID id = JobID.forName(jobId);
+ Cluster cluster = new Cluster(this.getConf());
+ try {
+ Job job = cluster.getJob(id);
+ if (job == null) {
+ LOG.error("No job found for " + id);
+ // should we throw exception
+ return;
+ }
+ if (job.isComplete() || job.isRetired()) {
+ return;
+ }
+
+ job.killJob();
+ LOG.debug("Killed copy job " + id);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
new file mode 100644
index 0000000..ffb61ec
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+
+/**
+ * MapReduce implementation of {@link RestoreJob}
+ *
+ * For full backup restore, it runs {@link HFileSplitterJob} job and creates
+ * HFiles which are aligned with a region boundaries of a table being
+ * restored, for incremental backup restore it runs {@link WALPlayer} in
+ * bulk load mode (creates HFiles from WAL edits).
+ *
+ * The resulting HFiles then are loaded using HBase bulk load tool
+ * {@link LoadIncrementalHFiles}
+ */
+@InterfaceAudience.Private
+public class MapReduceRestoreJob implements RestoreJob {
+ public static final Log LOG = LogFactory.getLog(MapReduceRestoreJob.class);
+
+ private Tool player;
+ private Configuration conf;
+
+ public MapReduceRestoreJob() {
+ }
+
+ @Override
+ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
+ boolean fullBackupRestore) throws IOException {
+
+ String bulkOutputConfKey;
+
+ if (fullBackupRestore) {
+ player = new HFileSplitterJob();
+ bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
+ } else {
+ player = new WALPlayer();
+ bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
+ }
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file
+ String dirs = StringUtils.join(dirPaths, ",");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+ + " backup from directory " + dirs + " from hbase tables "
+ + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) +
+ " to tables "
+ + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
+ }
+
+ for (int i = 0; i < tableNames.length; i++) {
+
+ LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
+ Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+ Configuration conf = getConf();
+ conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+ String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+ int result = 0;
+ int loaderResult = 0;
+ try {
+
+ player.setConf(getConf());
+ result = player.run(playerArgs);
+ if (succeeded(result)) {
+ // do bulk load
+ LoadIncrementalHFiles loader = createLoader();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
+ }
+ String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
+ loaderResult = loader.run(args);
+
+ if (failed(loaderResult)) {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+ }
+ } else {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+ }
+ LOG.debug("Restore Job finished:" + result);
+ } catch (Exception e) {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop and HBase logs) ", e);
+ }
+
+ }
+ }
+
+ private String getFileNameCompatibleString(TableName table) {
+ return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+ }
+
+ private boolean failed(int result) {
+ return result != 0;
+ }
+
+ private boolean succeeded(int result) {
+ return result == 0;
+ }
+
+ private LoadIncrementalHFiles createLoader() throws IOException {
+ // set configuration for restore:
+ // LoadIncrementalHFile needs more time
+ // <name>hbase.rpc.timeout</name> <value>600000</value>
+ // calculates
+ Integer milliSecInHour = 3600000;
+ Configuration conf = new Configuration(getConf());
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
+
+ // By default, it is 32 and loader will fail if # of files in any region exceed this
+ // limit. Bad for snapshot restore.
+ conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+ LoadIncrementalHFiles loader = null;
+ try {
+ loader = new LoadIncrementalHFiles(conf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return loader;
+ }
+
+ private Path getBulkOutputDir(String tableName) throws IOException {
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+ String tmp =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+ HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path path =
+ new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ + EnvironmentEdgeManager.currentTime());
+ fs.deleteOnExit(path);
+ return path;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3aaea8e0/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
new file mode 100644
index 0000000..b5b887c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for incremental backup
+ * before deleting it when its TTL is over.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupLogCleaner extends BaseLogCleanerDelegate {
+ private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
+
+ private boolean stopped = false;
+ private Connection conn;
+
+ public BackupLogCleaner() {
+ }
+
+ @Override
+ public void init(Map<String, Object> params) {
+ if (params != null && params.containsKey(HMaster.MASTER)) {
+ MasterServices master = (MasterServices) params.get(HMaster.MASTER);
+ conn = master.getConnection();
+ if (getConf() == null) {
+ super.setConf(conn.getConfiguration());
+ }
+ }
+ if (conn == null) {
+ try {
+ conn = ConnectionFactory.createConnection(getConf());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to create connection", ioe);
+ }
+ }
+ }
+
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ // all members of this class are null if backup is disabled,
+ // so we cannot filter the files
+ if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
+ LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ + " setting");
+ return files;
+ }
+
+ List<FileStatus> list = new ArrayList<FileStatus>();
+ try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+ // If we do not have recorded backup sessions
+ try {
+ if (!table.hasBackupSessions()) {
+ LOG.trace("BackupLogCleaner has no backup sessions");
+ return files;
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.warn("backup system table is not available" + tnfe.getMessage());
+ return files;
+ }
+
+ for (FileStatus file : files) {
+ String wal = file.getPath().toString();
+ boolean logInSystemTable = table.isWALFileDeletable(wal);
+ if (LOG.isDebugEnabled()) {
+ if (logInSystemTable) {
+ LOG.debug("Found log file in backup system table, deleting: " + wal);
+ list.add(file);
+ } else {
+ LOG.debug("Didn't find this log in backup system table, keeping: " + wal);
+ }
+ }
+ }
+ return list;
+ } catch (IOException e) {
+ LOG.error("Failed to get backup system table table, therefore will keep all files", e);
+ // nothing to delete
+ return new ArrayList<FileStatus>();
+ }
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ // If backup is disabled, keep all members null
+ if (!config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
+ BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)) {
+ LOG.warn("Backup is disabled - allowing all wals to be deleted");
+ return;
+ }
+ super.setConf(config);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ this.stopped = true;
+ LOG.info("Stopping BackupLogCleaner");
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+}