You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/19 20:31:39 UTC
hbase git commit: HBASE-15448 HBase Backup Phase 3: Restore
optimization 2 (Vladimir Rodionov)
Repository: hbase
Updated Branches:
refs/heads/HBASE-7912 fef921860 -> 6d1e7079f
HBASE-15448 HBase Backup Phase 3: Restore optimization 2 (Vladimir Rodionov)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d1e7079
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d1e7079
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d1e7079
Branch: refs/heads/HBASE-7912
Commit: 6d1e7079f7f5eccf426dc0dd5136681bbc8e4d52
Parents: fef9218
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 19 13:31:19 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 19 13:31:19 2016 -0700
----------------------------------------------------------------------
.../backup/BackupRestoreServerFactory.java | 12 +-
.../hadoop/hbase/backup/HBackupFileSystem.java | 28 ++
.../hbase/backup/IncrementalRestoreService.java | 42 --
.../hadoop/hbase/backup/RestoreService.java | 50 +++
.../backup/impl/RestoreTablesProcedure.java | 402 -------------------
.../hbase/backup/mapreduce/HFileSplitter.java | 190 +++++++++
.../mapreduce/MapReduceRestoreService.java | 108 ++---
.../backup/master/FullTableBackupProcedure.java | 1 -
.../backup/master/RestoreTablesProcedure.java | 387 ++++++++++++++++++
.../hbase/backup/util/RestoreServerUtil.java | 149 ++++---
.../hbase/mapreduce/HFileInputFormat2.java | 174 ++++++++
.../org/apache/hadoop/hbase/master/HMaster.java | 2 +-
.../hadoop/hbase/backup/TestBackupBase.java | 2 +-
.../hbase/backup/TestIncrementalBackup.java | 34 +-
14 files changed, 1016 insertions(+), 565 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
index 25ec9d9..7644a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java
@@ -36,15 +36,15 @@ public final class BackupRestoreServerFactory {
}
/**
- * Gets incremental restore service
+ * Gets backup restore service
* @param conf - configuration
- * @return incremental backup service instance
+ * @return backup restore service instance
*/
- public static IncrementalRestoreService getIncrementalRestoreService(Configuration conf) {
- Class<? extends IncrementalRestoreService> cls =
+ public static RestoreService getRestoreService(Configuration conf) {
+ Class<? extends RestoreService> cls =
conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreService.class,
- IncrementalRestoreService.class);
- IncrementalRestoreService service = ReflectionUtils.newInstance(cls, conf);
+ RestoreService.class);
+ RestoreService service = ReflectionUtils.newInstance(cls, conf);
service.setConf(conf);
return service;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
index 1fc0a92..a130a9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java
@@ -20,18 +20,25 @@
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
/**
* View to an on-disk Backup Image FileSytem
@@ -77,6 +84,27 @@ public class HBackupFileSystem {
return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName));
}
+
+ public static List<HRegionInfo> loadRegionInfos(TableName tableName,
+ Path backupRootPath, String backupId, Configuration conf) throws IOException
+ {
+ Path backupTableRoot = getTableBackupPath(tableName, backupRootPath, backupId);
+ FileSystem fs = backupTableRoot.getFileSystem(conf);
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupTableRoot, true);
+ List<HRegionInfo> infos = new ArrayList<HRegionInfo>();
+ while(it.hasNext()) {
+ LocatedFileStatus lfs = it.next();
+ if(lfs.isFile() && lfs.getPath().toString().endsWith(HRegionFileSystem.REGION_INFO_FILE)) {
+ Path regionDir = lfs.getPath().getParent();
+ HRegionInfo info = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+ infos.add(info);
+ }
+ }
+
+ Collections.sort(infos);
+ return infos;
+ }
+
/**
* Given the backup root dir and the backup id, return the log file location for an incremental
* backup.
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
deleted file mode 100644
index ae48480..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface IncrementalRestoreService extends Configurable{
-
- /**
- * Run restore operation
- * @param logDirectoryPaths - path array of WAL log directories
- * @param fromTables - from tables
- * @param toTables - to tables
- * @throws IOException
- */
- public void run(Path[] logDirectoryPaths, TableName[] fromTables, TableName[] toTables)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
new file mode 100644
index 0000000..2da98c2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+/**
+ * Backup restore service interface
+ * Concrete implementation is provided by backup provider.
+ */
+
+public interface RestoreService extends Configurable{
+
+ /**
+ * Run restore operation
+ * @param dirPaths - path array of WAL log directories
+ * @param fromTables - from tables
+ * @param toTables - to tables
+ * @param fullBackupRestore - full backup restore
+ * @throws IOException
+ */
+ public void run(Path[] dirPaths, TableName[] fromTables,
+ TableName[] toTables, boolean fullBackupRestore)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
deleted file mode 100644
index 7ac11de..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState;
-import org.apache.hadoop.security.UserGroupInformation;
-
-@InterfaceAudience.Private
-public class RestoreTablesProcedure
- extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState>
- implements TableProcedureInterface {
- private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class);
-
- private final AtomicBoolean aborted = new AtomicBoolean(false);
- private Configuration conf;
- private String backupId;
- private List<TableName> sTableList;
- private List<TableName> tTableList;
- private String targetRootDir;
- private boolean isOverwrite;
-
- public RestoreTablesProcedure() {
- // Required by the Procedure framework to create the procedure on replay
- }
-
- public RestoreTablesProcedure(final MasterProcedureEnv env,
- final String targetRootDir, String backupId, List<TableName> sTableList,
- List<TableName> tTableList, boolean isOverwrite) throws IOException {
- this.targetRootDir = targetRootDir;
- this.backupId = backupId;
- this.sTableList = sTableList;
- this.tTableList = tTableList;
- if (tTableList == null || tTableList.isEmpty()) {
- this.tTableList = sTableList;
- }
- this.isOverwrite = isOverwrite;
- this.setOwner(env.getRequestUser().getUGI().getShortUserName());
- }
-
- @Override
- public byte[] getResult() {
- return null;
- }
-
- /**
- * Validate target Tables
- * @param conn connection
- * @param mgr table state manager
- * @param tTableArray: target tables
- * @param isOverwrite overwrite existing table
- * @throws IOException exception
- */
- private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray,
- boolean isOverwrite)
- throws IOException {
- ArrayList<TableName> existTableList = new ArrayList<>();
- ArrayList<TableName> disabledTableList = new ArrayList<>();
-
- // check if the tables already exist
- for (TableName tableName : tTableArray) {
- if (MetaTableAccessor.tableExists(conn, tableName)) {
- existTableList.add(tableName);
- if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) {
- disabledTableList.add(tableName);
- }
- } else {
- LOG.info("HBase table " + tableName
- + " does not exist. It will be created during restore process");
- }
- }
-
- if (existTableList.size() > 0) {
- if (!isOverwrite) {
- LOG.error("Existing table (" + existTableList + ") found in the restore target, please add "
- + "\"-overwrite\" option in the command if you mean to restore to these existing tables");
- throw new IOException("Existing table found in target while no \"-overwrite\" "
- + "option found");
- } else {
- if (disabledTableList.size() > 0) {
- LOG.error("Found offline table in the restore target, "
- + "please enable them before restore with \"-overwrite\" option");
- LOG.info("Offline table list in restore target: " + disabledTableList);
- throw new IOException(
- "Found offline table in the target when restore with \"-overwrite\" option");
- }
- }
- }
- }
-
- /**
- * Restore operation handle each backupImage in iterator
- * @param conn the Connection
- * @param it: backupImage iterator - ascending
- * @param sTable: table to be restored
- * @param tTable: table to be restored to
- * @param truncateIfExists truncate table if it exists
- * @throws IOException exception
- */
- private void restoreImages(MasterServices svc, Iterator<BackupImage> it, TableName sTable,
- TableName tTable, boolean truncateIfExists) throws IOException {
-
- // First image MUST be image of a FULL backup
- BackupImage image = it.next();
-
- String rootDir = image.getRootDir();
- String backupId = image.getBackupId();
- Path backupRoot = new Path(rootDir);
-
- // We need hFS only for full restore (see the code)
- RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId);
- BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
-
- Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
-
- // TODO: convert feature will be provided in a future JIRA
- boolean converted = false;
- String lastIncrBackupId = null;
- List<String> logDirList = null;
-
- // Scan incremental backups
- if (it.hasNext()) {
- // obtain the backupId for most recent incremental
- logDirList = new ArrayList<String>();
- while (it.hasNext()) {
- BackupImage im = it.next();
- String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
- logDirList.add(logBackupDir);
- lastIncrBackupId = im.getBackupId();
- }
- }
- if (manifest.getType() == BackupType.FULL || converted) {
- LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from "
- + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString());
- restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable,
- converted, truncateIfExists, lastIncrBackupId);
- } else { // incremental Backup
- throw new IOException("Unexpected backup type " + image.getType());
- }
-
- // The rest one are incremental
- if (logDirList != null) {
- String logDirs = StringUtils.join(logDirList, ",");
- LOG.info("Restoring '" + sTable + "' to '" + tTable
- + "' from log dirs: " + logDirs);
- String[] sarr = new String[logDirList.size()];
- logDirList.toArray(sarr);
- Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr);
- restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths,
- new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId);
- }
- LOG.info(sTable + " has been successfully restored to " + tTable);
- }
-
- /**
- * Restore operation. Stage 2: resolved Backup Image dependency
- * @param svc MasterServices
- * @param backupManifestMap : tableName, Manifest
- * @param sTableArray The array of tables to be restored
- * @param tTableArray The array of mapping tables to restore to
- * @param isOverwrite overwrite
- * @return set of BackupImages restored
- * @throws IOException exception
- */
- private void restoreStage(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap,
- TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
- TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
- boolean truncateIfExists = isOverwrite;
- try {
- for (int i = 0; i < sTableArray.length; i++) {
- TableName table = sTableArray[i];
- BackupManifest manifest = backupManifestMap.get(table);
- // Get the image list of this backup for restore in time order from old
- // to new.
- List<BackupImage> list = new ArrayList<BackupImage>();
- list.add(manifest.getBackupImage());
- List<BackupImage> depList = manifest.getDependentListByTable(table);
- list.addAll(depList);
- TreeSet<BackupImage> restoreList = new TreeSet<BackupImage>(list);
- LOG.debug("need to clear merged Image. to be implemented in future jira");
- restoreImages(svc, restoreList.iterator(), table, tTableArray[i], truncateIfExists);
- restoreImageSet.addAll(restoreList);
-
- if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
- LOG.info("Restore includes the following image(s):");
- for (BackupImage image : restoreImageSet) {
- LOG.info("Backup: "
- + image.getBackupId()
- + " "
- + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
- table));
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Failed", e);
- throw new IOException(e);
- }
- LOG.debug("restoreStage finished");
- }
-
- @Override
- protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state)
- throws InterruptedException {
- if (conf == null) {
- conf = env.getMasterConfiguration();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(this + " execute state=" + state);
- }
- TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]);
- try {
- switch (state) {
- case VALIDATION:
-
- // check the target tables
- checkTargetTables(env.getMasterServices().getConnection(),
- env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite);
-
- setNextState(RestoreTablesState.RESTORE_IMAGES);
- break;
- case RESTORE_IMAGES:
- TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]);
- HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
- // check and load backup image manifest for the tables
- Path rootPath = new Path(targetRootDir);
- HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
- backupId);
- restoreStage(env.getMasterServices(), backupManifestMap, sTableArray,
- tTableArray, isOverwrite);
-
- return Flow.NO_MORE_STATE;
-
- default:
- throw new UnsupportedOperationException("unhandled state=" + state);
- }
- } catch (IOException e) {
- setFailure("restore-table", e);
- }
- return Flow.HAS_MORE_STATE;
- }
-
- @Override
- protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state)
- throws IOException {
- }
-
- @Override
- protected RestoreTablesState getState(final int stateId) {
- return RestoreTablesState.valueOf(stateId);
- }
-
- @Override
- protected int getStateId(final RestoreTablesState state) {
- return state.getNumber();
- }
-
- @Override
- protected RestoreTablesState getInitialState() {
- return RestoreTablesState.VALIDATION;
- }
-
- @Override
- protected void setNextState(final RestoreTablesState state) {
- if (aborted.get()) {
- setAbortFailure("snapshot-table", "abort requested");
- } else {
- super.setNextState(state);
- }
- }
-
- @Override
- public boolean abort(final MasterProcedureEnv env) {
- aborted.set(true);
- return true;
- }
-
- @Override
- public void toStringClassDetails(StringBuilder sb) {
- sb.append(getClass().getSimpleName());
- sb.append(" (targetRootDir=");
- sb.append(targetRootDir);
- sb.append(" isOverwrite= ");
- sb.append(isOverwrite);
- sb.append(" backupId= ");
- sb.append(backupId);
- sb.append(")");
- }
-
- MasterProtos.RestoreTablesRequest toRestoreTables() {
- MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder();
- bldr.setOverwrite(isOverwrite).setBackupId(backupId);
- bldr.setBackupRootDir(targetRootDir);
- for (TableName table : sTableList) {
- bldr.addTables(ProtobufUtil.toProtoTableName(table));
- }
- for (TableName table : tTableList) {
- bldr.addTargetTables(ProtobufUtil.toProtoTableName(table));
- }
- return bldr.build();
- }
-
- @Override
- public void serializeStateData(final OutputStream stream) throws IOException {
- super.serializeStateData(stream);
-
- MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables();
- restoreTables.writeDelimitedTo(stream);
- }
-
- @Override
- public void deserializeStateData(final InputStream stream) throws IOException {
- super.deserializeStateData(stream);
-
- MasterProtos.RestoreTablesRequest proto =
- MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream);
- backupId = proto.getBackupId();
- targetRootDir = proto.getBackupRootDir();
- isOverwrite = proto.getOverwrite();
- sTableList = new ArrayList<>(proto.getTablesList().size());
- for (HBaseProtos.TableName table : proto.getTablesList()) {
- sTableList.add(ProtobufUtil.toTableName(table));
- }
- tTableList = new ArrayList<>(proto.getTargetTablesList().size());
- for (HBaseProtos.TableName table : proto.getTargetTablesList()) {
- tTableList.add(ProtobufUtil.toTableName(table));
- }
- }
-
- @Override
- public TableName getTableName() {
- return TableName.BACKUP_TABLE_NAME;
- }
-
- @Override
- public TableOperationType getTableOperationType() {
- return TableOperationType.RESTORE;
- }
-
- @Override
- protected boolean acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) {
- return false;
- }
- return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
- }
-
- @Override
- protected void releaseLock(final MasterProcedureEnv env) {
- env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
new file mode 100644
index 0000000..c69a335
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat2;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a M/R job.
+ * The tool generates HFiles for later bulk importing,
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HFileSplitter extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(HFileSplitter.class);
+ final static String NAME = "HFileSplitter";
+ public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+ public final static String TABLES_KEY = "hfile.input.tables";
+ public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ public HFileSplitter(){
+ }
+
+ protected HFileSplitter(final Configuration c) {
+ super(c);
+ }
+
+ /**
+ * A mapper that just writes out cells.
+ * This one can be used together with {@link KeyValueSortReducer}
+ */
+ static class HFileCellMapper
+ extends Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+ @Override
+ public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+ InterruptedException {
+ // Convert value to KeyValue if subclass
+ if (!value.getClass().equals(KeyValue.class)) {
+ value =
+ new KeyValue(value.getRowArray(), value.getRowOffset(), (int) value.getRowLength(),
+ value.getFamilyArray(), value.getFamilyOffset(), (int) value.getFamilyLength(),
+ value.getQualifierArray(), value.getQualifierOffset(),
+ (int) value.getQualifierLength(), value.getTimestamp(), Type.codeToType(value
+ .getTypeByte()), value.getValueArray(), value.getValueOffset(),
+ value.getValueLength());
+ }
+ context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ // do nothing
+ }
+ }
+
+
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Configuration conf = getConf();
+ String inputDirs = args[0];
+ String tabName = args[1];
+ conf.setStrings(TABLES_KEY, tabName);
+ Job job =
+ Job.getInstance(conf,
+ conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+ job.setJarByClass(HFileSplitter.class);
+ FileInputFormat.addInputPaths(job, inputDirs);
+ job.setInputFormatClass(HFileInputFormat2.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ if (hfileOutPath != null) {
+ LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+ TableName tableName = TableName.valueOf(tabName);
+ job.setMapperClass(HFileCellMapper.class);
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputValueClass(KeyValue.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
+ }
+ LOG.debug("success configuring load incremental job");
+
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ com.google.common.base.Preconditions.class);
+ } else {
+ throw new IOException("No bulk output directory specified");
+ }
+ return job;
+ }
+
+
+ /**
+ * Print usage
+ * @param errorMsg Error message. Can be null.
+ */
+ private void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+ System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+ System.err.println("<table> table to load.\n");
+ System.err.println("To generate HFiles for a bulk data load, pass the option:");
+ System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err.println("Other options:");
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the HFile splitter");
+ System.err.println("For performance also consider the following options:\n"
+ + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ System.exit(-1);
+ }
+ Job job = createSubmittableJob(args);
+ int result =job.waitForCompletion(true) ? 0 : 1;
+ return result;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
index c47d6ed..18c1f86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
@@ -1,13 +1,13 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,54 +24,67 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
+import org.apache.hadoop.hbase.backup.RestoreService;
import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class MapReduceRestoreService implements IncrementalRestoreService {
+public class MapReduceRestoreService implements RestoreService {
public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
- private WALPlayer player;
+ private Tool player;
+ private Configuration conf;
public MapReduceRestoreService() {
- this.player = new WALPlayer();
}
@Override
- public void run(Path[] logDirPaths, TableName[] tableNames, TableName[] newTableNames)
- throws IOException {
+ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
+ boolean fullBackupRestore) throws IOException {
- // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
- // log file
- String logDirs = StringUtils.join(logDirPaths, ",");
- LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables "
- + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames));
+ String bulkOutputConfKey;
+
+ if (fullBackupRestore) {
+ player = new HFileSplitter();
+ bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY;
+ } else {
+ player = new WALPlayer();
+ bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
+ }
+ // Player reads all files in arbitrary directory structure and creates
+ // a Map task for each file
+ String dirs = StringUtils.join(dirPaths, ",");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+ + " backup from directory " + dirs + " from hbase tables "
+ + BackupServerUtil.join(tableNames) + " to tables "
+ + BackupServerUtil.join(newTableNames));
+ }
for (int i = 0; i < tableNames.length; i++) {
-
- LOG.info("Restore "+ tableNames[i] + " into "+ newTableNames[i]);
-
+
+ LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
- String[] playerArgs =
- { logDirs, tableNames[i].getNameAsString() };
+ Configuration conf = getConf();
+ conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+ String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
int result = 0;
int loaderResult = 0;
try {
- Configuration conf = getConf();
- conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
- player.setConf(getConf());
+
+ player.setConf(getConf());
result = player.run(playerArgs);
if (succeeded(result)) {
// do bulk load
@@ -81,38 +94,37 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
}
String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
loaderResult = loader.run(args);
- if(failed(loaderResult)) {
- throw new IOException("Can not restore from backup directory " + logDirs
- + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+
+ if (failed(loaderResult)) {
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
}
} else {
- throw new IOException("Can not restore from backup directory " + logDirs
- + " (check Hadoop/MR and HBase logs). WALPlayer return code =" + result);
+ throw new IOException("Can not restore from backup directory " + dirs
+ + " (check Hadoop/MR and HBase logs). Player return code =" + result);
}
LOG.debug("Restore Job finished:" + result);
} catch (Exception e) {
- throw new IOException("Can not restore from backup directory " + logDirs
+ throw new IOException("Can not restore from backup directory " + dirs
+ " (check Hadoop and HBase logs) ", e);
}
}
}
- private String getFileNameCompatibleString(TableName table)
- {
- return table.getNamespaceAsString() +"-"+ table.getQualifierAsString();
+ private String getFileNameCompatibleString(TableName table) {
+ return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
}
-
+
private boolean failed(int result) {
return result != 0;
}
-
+
private boolean succeeded(int result) {
return result == 0;
}
- private LoadIncrementalHFiles createLoader()
- throws IOException {
+ private LoadIncrementalHFiles createLoader() throws IOException {
// set configuration for restore:
// LoadIncrementalHFile needs more time
// <name>hbase.rpc.timeout</name> <value>600000</value>
@@ -120,10 +132,11 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
Integer milliSecInHour = 3600000;
Configuration conf = new Configuration(getConf());
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
-
+
// By default, it is 32 and loader will fail if # of files in any region exceed this
// limit. Bad for snapshot restore.
conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
LoadIncrementalHFiles loader = null;
try {
loader = new LoadIncrementalHFiles(conf);
@@ -133,27 +146,26 @@ public class MapReduceRestoreService implements IncrementalRestoreService {
return loader;
}
- private Path getBulkOutputDir(String tableName) throws IOException
- {
+ private Path getBulkOutputDir(String tableName) throws IOException {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
- String tmp = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-"+tableName + "-"
- + EnvironmentEdgeManager.currentTime());
+ String tmp =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path path =
+ new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ + EnvironmentEdgeManager.currentTime());
fs.deleteOnExit(path);
return path;
}
-
@Override
public Configuration getConf() {
- return player.getConf();
+ return conf;
}
@Override
public void setConf(Configuration conf) {
- this.player.setConf(conf);
+ this.conf = conf;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
index c56aaf3..94e991f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.Private
public class FullTableBackupProcedure
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
new file mode 100644
index 0000000..2678278
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState;
+
+@InterfaceAudience.Private
+public class RestoreTablesProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState>
+ implements TableProcedureInterface {
+ private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class);
+
+ private final AtomicBoolean aborted = new AtomicBoolean(false);
+ private Configuration conf;
+ private String backupId;
+ private List<TableName> sTableList;
+ private List<TableName> tTableList;
+ private String targetRootDir;
+ private boolean isOverwrite;
+
+ public RestoreTablesProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ }
+
+ public RestoreTablesProcedure(final MasterProcedureEnv env,
+ final String targetRootDir, String backupId, List<TableName> sTableList,
+ List<TableName> tTableList, boolean isOverwrite) throws IOException {
+ this.targetRootDir = targetRootDir;
+ this.backupId = backupId;
+ this.sTableList = sTableList;
+ this.tTableList = tTableList;
+ if (tTableList == null || tTableList.isEmpty()) {
+ this.tTableList = sTableList;
+ }
+ this.isOverwrite = isOverwrite;
+ this.setOwner(env.getRequestUser().getUGI().getShortUserName());
+ }
+
+ @Override
+ public byte[] getResult() {
+ return null;
+ }
+
+ /**
+ * Validate target Tables
+ * @param conn connection
+ * @param mgr table state manager
+ * @param tTableArray: target tables
+ * @param isOverwrite overwrite existing table
+ * @throws IOException exception
+ */
+ private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray,
+ boolean isOverwrite)
+ throws IOException {
+ ArrayList<TableName> existTableList = new ArrayList<>();
+ ArrayList<TableName> disabledTableList = new ArrayList<>();
+
+ // check if the tables already exist
+ for (TableName tableName : tTableArray) {
+ if (MetaTableAccessor.tableExists(conn, tableName)) {
+ existTableList.add(tableName);
+ if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) {
+ disabledTableList.add(tableName);
+ }
+ } else {
+ LOG.info("HBase table " + tableName
+ + " does not exist. It will be created during restore process");
+ }
+ }
+
+ if (existTableList.size() > 0) {
+ if (!isOverwrite) {
+ LOG.error("Existing table (" + existTableList + ") found in the restore target, please add "
+ + "\"-overwrite\" option in the command if you mean to restore to these existing tables");
+ throw new IOException("Existing table found in target while no \"-overwrite\" "
+ + "option found");
+ } else {
+ if (disabledTableList.size() > 0) {
+ LOG.error("Found offline table in the restore target, "
+ + "please enable them before restore with \"-overwrite\" option");
+ LOG.info("Offline table list in restore target: " + disabledTableList);
+ throw new IOException(
+ "Found offline table in the target when restore with \"-overwrite\" option");
+ }
+ }
+ }
+ }
+
+ /**
+ * Restore operation handle each backupImage in array
+ * @param svc: master services
+ * @param images: array BackupImage
+ * @param sTable: table to be restored
+ * @param tTable: table to be restored to
+ * @param truncateIfExists: truncate table
+ * @throws IOException exception
+ */
+
+ private void restoreImages(MasterServices svc, BackupImage[] images, TableName sTable, TableName tTable,
+ boolean truncateIfExists) throws IOException {
+
+ // First image MUST be image of a FULL backup
+ BackupImage image = images[0];
+ String rootDir = image.getRootDir();
+ String backupId = image.getBackupId();
+ Path backupRoot = new Path(rootDir);
+ RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId);
+ Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
+ String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
+ // We need hFS only for full restore (see the code)
+ BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+ if (manifest.getType() == BackupType.FULL) {
+ LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full"
+ + " backup image " + tableBackupPath.toString());
+ restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, truncateIfExists,
+ lastIncrBackupId);
+ } else { // incremental Backup
+ throw new IOException("Unexpected backup type " + image.getType());
+ }
+
+ if (images.length == 1) {
+ // full backup restore done
+ return;
+ }
+
+ List<Path> dirList = new ArrayList<Path>();
+ // add full backup path
+ // full backup path comes first
+ for (int i = 1; i < images.length; i++) {
+ BackupImage im = images[i];
+ String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+ dirList.add(new Path(logBackupDir));
+ }
+
+ String dirs = StringUtils.join(dirList, ",");
+ LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
+ Path[] paths = new Path[dirList.size()];
+ dirList.toArray(paths);
+ restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, new TableName[] { sTable },
+ new TableName[] { tTable }, lastIncrBackupId);
+ LOG.info(sTable + " has been successfully restored to " + tTable);
+
+ }
+
+ /**
+ * Restore operation. Stage 2: resolved Backup Image dependency
+ * @param svc: master services
+ * @param backupManifestMap : tableName, Manifest
+ * @param sTableArray The array of tables to be restored
+ * @param tTableArray The array of mapping tables to restore to
+ * @return set of BackupImages restored
+ * @throws IOException exception
+ */
+ private void restore(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap,
+ TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
+ TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+ boolean truncateIfExists = isOverwrite;
+ try {
+ for (int i = 0; i < sTableArray.length; i++) {
+ TableName table = sTableArray[i];
+ BackupManifest manifest = backupManifestMap.get(table);
+ // Get the image list of this backup for restore in time order from old
+ // to new.
+ List<BackupImage> list = new ArrayList<BackupImage>();
+ list.add(manifest.getBackupImage());
+ TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+ List<BackupImage> depList = manifest.getDependentListByTable(table);
+ set.addAll(depList);
+ BackupImage[] arr = new BackupImage[set.size()];
+ set.toArray(arr);
+ restoreImages(svc, arr, table, tTableArray[i], truncateIfExists);
+ restoreImageSet.addAll(list);
+ if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+ LOG.info("Restore includes the following image(s):");
+ for (BackupImage image : restoreImageSet) {
+ LOG.info("Backup: "
+ + image.getBackupId()
+ + " "
+ + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+ table));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed", e);
+ throw new IOException(e);
+ }
+ LOG.debug("restoreStage finished");
+ }
+
+ @Override
+ protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state)
+ throws InterruptedException {
+ if (conf == null) {
+ conf = env.getMasterConfiguration();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " execute state=" + state);
+ }
+ TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]);
+ try {
+ switch (state) {
+ case VALIDATION:
+
+ // check the target tables
+ checkTargetTables(env.getMasterServices().getConnection(),
+ env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite);
+
+ setNextState(RestoreTablesState.RESTORE_IMAGES);
+ break;
+ case RESTORE_IMAGES:
+ TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]);
+ HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+ // check and load backup image manifest for the tables
+ Path rootPath = new Path(targetRootDir);
+ HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
+ backupId);
+ restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray, isOverwrite);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ } catch (IOException e) {
+ setFailure("restore-table", e);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state)
+ throws IOException {
+ }
+
+ @Override
+ protected RestoreTablesState getState(final int stateId) {
+ return RestoreTablesState.valueOf(stateId);
+ }
+
+ @Override
+ protected int getStateId(final RestoreTablesState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected RestoreTablesState getInitialState() {
+ return RestoreTablesState.VALIDATION;
+ }
+
+ @Override
+ protected void setNextState(final RestoreTablesState state) {
+ if (aborted.get()) {
+ setAbortFailure("snapshot-table", "abort requested");
+ } else {
+ super.setNextState(state);
+ }
+ }
+
+ @Override
+ public boolean abort(final MasterProcedureEnv env) {
+ aborted.set(true);
+ return true;
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ sb.append(" (targetRootDir=");
+ sb.append(targetRootDir);
+ sb.append(" isOverwrite= ");
+ sb.append(isOverwrite);
+ sb.append(" backupId= ");
+ sb.append(backupId);
+ sb.append(")");
+ }
+
+ MasterProtos.RestoreTablesRequest toRestoreTables() {
+ MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder();
+ bldr.setOverwrite(isOverwrite).setBackupId(backupId);
+ bldr.setBackupRootDir(targetRootDir);
+ for (TableName table : sTableList) {
+ bldr.addTables(ProtobufUtil.toProtoTableName(table));
+ }
+ for (TableName table : tTableList) {
+ bldr.addTargetTables(ProtobufUtil.toProtoTableName(table));
+ }
+ return bldr.build();
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) throws IOException {
+ super.serializeStateData(stream);
+
+ MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables();
+ restoreTables.writeDelimitedTo(stream);
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) throws IOException {
+ super.deserializeStateData(stream);
+
+ MasterProtos.RestoreTablesRequest proto =
+ MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream);
+ backupId = proto.getBackupId();
+ targetRootDir = proto.getBackupRootDir();
+ isOverwrite = proto.getOverwrite();
+ sTableList = new ArrayList<>(proto.getTablesList().size());
+ for (HBaseProtos.TableName table : proto.getTablesList()) {
+ sTableList.add(ProtobufUtil.toTableName(table));
+ }
+ tTableList = new ArrayList<>(proto.getTargetTablesList().size());
+ for (HBaseProtos.TableName table : proto.getTargetTablesList()) {
+ tTableList.add(ProtobufUtil.toTableName(table));
+ }
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.BACKUP_TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.RESTORE;
+ }
+
+ @Override
+ protected boolean acquireLock(final MasterProcedureEnv env) {
+ if (env.waitInitialized(this)) {
+ return false;
+ }
+ return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
index 37bfcc2..3da7860 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -41,10 +41,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.IncrementalRestoreService;
+import org.apache.hadoop.hbase.backup.RestoreService;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
@@ -115,6 +114,7 @@ public class RestoreServerUtil {
*/
Path getTableArchivePath(TableName tableName)
throws IOException {
+
Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath,
backupId), HConstants.HFILE_ARCHIVE_DIRECTORY);
Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
@@ -148,8 +148,33 @@ public class RestoreServerUtil {
return regionDirList;
}
+ /**
+ * Gets region list
+ * @param tableName table name
+ * @param backupId backup id
+ * @return RegionList region list
+ * @throws FileNotFoundException exception
+ * @throws IOException exception
+ */
+ ArrayList<Path> getRegionList(TableName tableName, String backupId) throws FileNotFoundException,
+ IOException {
+ Path tableArchivePath =
+ new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
+ backupId, tableName));
+
+ ArrayList<Path> regionDirList = new ArrayList<Path>();
+ FileStatus[] children = fs.listStatus(tableArchivePath);
+ for (FileStatus childStatus : children) {
+ // here child refer to each region(Name)
+ Path child = childStatus.getPath();
+ regionDirList.add(child);
+ }
+ return regionDirList;
+ }
+
static void modifyTableSync(MasterServices svc, HTableDescriptor desc) throws IOException {
svc.modifyTable(desc.getTableName(), desc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ @SuppressWarnings("serial")
Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
setFirst(0);
setSecond(0);
@@ -234,16 +259,16 @@ public class RestoreServerUtil {
LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor);
}
}
- IncrementalRestoreService restoreService =
- BackupRestoreServerFactory.getIncrementalRestoreService(conf);
+ RestoreService restoreService =
+ BackupRestoreServerFactory.getRestoreService(conf);
- restoreService.run(logDirs, tableNames, newTableNames);
+ restoreService.run(logDirs, tableNames, newTableNames, false);
}
public void fullRestoreTable(MasterServices svc, Path tableBackupPath, TableName tableName,
- TableName newTableName, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
+ TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
throws IOException {
- restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, converted, truncateIfExists,
+ restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, truncateIfExists,
lastIncrBackupId);
}
@@ -355,20 +380,19 @@ public class RestoreServerUtil {
if (lastIncrBackupId != null) {
String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
lastIncrBackupId, tableName);
- // Path target = new Path(info.getBackupStatus(tableName).getTargetDir());
return FSTableDescriptors.getTableDescriptorFromFs(fileSys,
new Path(target)).getHTableDescriptor();
}
return null;
}
- private void restoreTableAndCreate(MasterServices svc, TableName tableName, TableName newTableName,
- Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId)
- throws IOException {
+ private void restoreTableAndCreate(MasterServices svc, TableName tableName,
+ TableName newTableName, Path tableBackupPath, boolean truncateIfExists,
+ String lastIncrBackupId) throws IOException {
if (newTableName == null || newTableName.equals("")) {
newTableName = tableName;
}
-
+ boolean fullBackupRestoreOnly = lastIncrBackupId == null;
FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId);
@@ -384,7 +408,7 @@ public class RestoreServerUtil {
if (snapshotMap.get(tableName) != null) {
SnapshotDescription desc =
SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
- SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc);
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
tableDescriptor = manifest.getTableDescriptor();
LOG.debug("obtained descriptor from " + manifest);
} else {
@@ -395,9 +419,9 @@ public class RestoreServerUtil {
if (tableDescriptor == null) {
LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost");
}
- } else if (converted) {
- // first check if this is a converted backup image
- LOG.error("convert will be supported in a future jira");
+ } else {
+ throw new IOException("Table snapshot directory: " + tableSnapshotPath
+ + " does not exist.");
}
}
@@ -405,13 +429,13 @@ public class RestoreServerUtil {
if (tableArchivePath == null) {
if (tableDescriptor != null) {
// find table descriptor but no archive dir => the table is empty, create table and exit
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("find table descriptor but no archive dir for table " + tableName
+ ", will only create table");
}
tableDescriptor.setName(newTableName);
- checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null,
- tableDescriptor, truncateIfExists);
+ checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, tableDescriptor,
+ truncateIfExists);
return;
} else {
throw new IllegalStateException("Cannot restore hbase table because directory '"
@@ -426,50 +450,61 @@ public class RestoreServerUtil {
tableDescriptor.setName(newTableName);
}
- if (!converted) {
- // record all region dirs:
- // load all files in dir
- try {
- ArrayList<Path> regionPathList = getRegionList(tableName);
-
- // should only try to create the table with all region informations, so we could pre-split
- // the regions in fine grain
- checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList,
- tableDescriptor, truncateIfExists);
- if (tableArchivePath != null) {
- // start real restore through bulkload
- // if the backup target is on local cluster, special action needed
- Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
- if (tempTableArchivePath.equals(tableArchivePath)) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
- }
- } else {
- regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
- if(LOG.isDebugEnabled()) {
- LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
- }
- }
+ // record all region dirs:
+ // load all files in dir
+ try {
+ // Region splits for last incremental backup id
+ // We use it to create table with pre-splits
+ ArrayList<Path> regionPathList =
+ fullBackupRestoreOnly ? getRegionList(tableName) : getRegionList(tableName,
+ lastIncrBackupId);
+
+ // should only try to create the table with all region informations, so we could pre-split
+ // the regions in fine grain
+ checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList,
+ tableDescriptor, truncateIfExists);
+
+ // Now get region splits from full backup
+ regionPathList = getRegionList(tableName);
+
+ // start real restore through bulkload
+ // if the backup target is on local cluster, special action needed
+ Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+ if (tempTableArchivePath.equals(tableArchivePath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
+ }
+ } else {
+ regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
+ }
+ }
- LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
- for (Path regionPath : regionPathList) {
- String regionName = regionPath.toString();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Restoring HFiles from directory " + regionName);
- }
- String[] args = { regionName, newTableName.getNameAsString() };
- loader.run(args);
+ if (fullBackupRestoreOnly) {
+ LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
+ for (Path regionPath : regionPathList) {
+ String regionName = regionPath.toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring HFiles from directory " + regionName);
}
+ String[] args = { regionName, newTableName.getNameAsString() };
+ loader.run(args);
}
- // we do not recovered edits
- } catch (Exception e) {
- throw new IllegalStateException("Cannot restore hbase table", e);
+ } else {
+ // Run restore service
+ Path[] dirs = new Path[regionPathList.size()];
+ regionPathList.toArray(dirs);
+ RestoreService restoreService =
+ BackupRestoreServerFactory.getRestoreService(conf);
+
+ restoreService.run(dirs, new TableName[] { tableName }, new TableName[] { newTableName },
+ true);
}
- } else {
- LOG.debug("convert will be supported in a future jira");
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot restore hbase table", e);
}
}
-
/**
* Gets region list
* @param tableArchivePath table archive path
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
new file mode 100644
index 0000000..dfcd7be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple input format for HFiles.
+ * This code was borrowed from Apache Crunch project.
+ * Updated to the recent version of HBase.
+ */
+public class HFileInputFormat2 extends FileInputFormat<NullWritable, Cell> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat2.class);
+
+ /**
+ * File filter that removes all "hidden" files. This might be something worth removing from
+ * a more general purpose utility; it accounts for the presence of metadata files created
+ * in the way we're doing exports.
+ */
+ static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Record reader for HFiles.
+ */
+ private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
+
+ private Reader in;
+ protected Configuration conf;
+ private HFileScanner scanner;
+
+ /**
+ * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
+ */
+ private Cell value = null;
+ private long count;
+ private boolean seeked = false;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) split;
+ conf = context.getConfiguration();
+ Path path = fileSplit.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ LOG.info("Initialize HFileRecordReader for {}", path);
+ this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+
+ // The file info must be loaded before the scanner can be used.
+ // This seems like a bug in HBase, but it's easily worked around.
+ this.in.loadFileInfo();
+ this.scanner = in.getScanner(false, false);
+
+ }
+
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ boolean hasNext;
+ if (!seeked) {
+ LOG.info("Seeking to start");
+ hasNext = scanner.seekTo();
+ seeked = true;
+ } else {
+ hasNext = scanner.next();
+ }
+ if (!hasNext) {
+ return false;
+ }
+ value = scanner.getCell();
+ count++;
+ return true;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Cell getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
+ // the start row, but better than nothing anyway.
+ return 1.0f * count / in.getEntries();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ }
+ }
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+
+ // Explode out directories that match the original FileInputFormat filters
+ // since HFiles are written to directories where the
+ // directory name is the column name
+ for (FileStatus status : super.listStatus(job)) {
+ if (status.isDirectory()) {
+ FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+ for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
+ result.add(match);
+ }
+ } else {
+ result.add(status);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new HFileRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ // This file isn't splittable.
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c54eee0..8c794c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.impl.RestoreTablesProcedure;
import org.apache.hadoop.hbase.backup.master.FullTableBackupProcedure;
import org.apache.hadoop.hbase.backup.master.IncrementalTableBackupProcedure;
+import org.apache.hadoop.hbase.backup.master.RestoreTablesProcedure;
import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index ff5e739..ec53a64 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -284,7 +284,7 @@ public class TestBackupBase {
FileSystem fs = FileSystem.get(conf1);
RemoteIterator<LocatedFileStatus> it = fs.listFiles( new Path(BACKUP_ROOT_DIR), true);
while(it.hasNext()){
- LOG.debug("DDEBUG: "+it.next().getPath());
+ LOG.debug(it.next().getPath());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 2251c74..fe00ac5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertTrue;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -29,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
import org.apache.hadoop.hbase.client.BackupAdmin;
@@ -37,14 +37,16 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@@ -66,6 +68,8 @@ public class TestIncrementalBackup extends TestBackupBase {
//implement all test cases in 1 test since incremental backup/restore has dependencies
@Test
public void TestIncBackupRestore() throws Exception {
+
+ int ADD_ROWS = 99;
// #1 - create full backup for all tables
LOG.info("create full backup image for all tables");
@@ -88,13 +92,13 @@ public class TestIncrementalBackup extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
- HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH);
- LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1);
+ HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(
- NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3));
+ NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3));
t1.close();
- LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1);
+ LOG.debug("written " + ADD_ROWS + " rows to " + table1);
HTable t2 = (HTable) conn.getTable(table2);
Put p2;
@@ -107,7 +111,23 @@ public class TestIncrementalBackup extends TestBackupBase {
Assert.assertThat(TEST_UTIL.countRows(t2), CoreMatchers.equalTo(NB_ROWS_IN_BATCH + 5));
t2.close();
LOG.debug("written " + 5 + " rows to " + table2);
+ // split table1
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ List<HRegion> regions = cluster.getRegions(table1);
+
+ byte[] name = regions.get(0).getRegionInfo().getRegionName();
+ long startSplitTime = EnvironmentEdgeManager.currentTime();
+ admin.splitRegion(name);
+
+ while (!admin.isTableAvailable(table1)) {
+ Thread.sleep(100);
+ }
+
+ long endSplitTime = EnvironmentEdgeManager.currentTime();
+ // split finished
+ LOG.debug("split finished in ="+ (endSplitTime - startSplitTime));
+
// #3 - incremental backup for multiple tables
tables = Lists.newArrayList(table1, table2);
request = new BackupRequest();
@@ -176,7 +196,7 @@ public class TestIncrementalBackup extends TestBackupBase {
LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
Assert.assertThat(TEST_UTIL.countRows(hTable, famName),
- CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ CoreMatchers.equalTo(NB_ROWS_IN_BATCH + ADD_ROWS));
LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows");
Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2));
hTable.close();