You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/03/28 23:23:44 UTC
hbase git commit: HBASE-14417 Incremental backup and bulk loading
Repository: hbase
Updated Branches:
refs/heads/master cb4fac1d1 -> 0345fc877
HBASE-14417 Incremental backup and bulk loading
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0345fc87
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0345fc87
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0345fc87
Branch: refs/heads/master
Commit: 0345fc87759a7d44ecc385327ebb586fc632fb65
Parents: cb4fac1
Author: tedyu <yu...@gmail.com>
Authored: Tue Mar 28 16:23:36 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Mar 28 16:23:36 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/backup/BackupHFileCleaner.java | 180 +++++++++
.../hadoop/hbase/backup/BackupObserver.java | 102 +++++
.../hbase/backup/impl/BackupAdminImpl.java | 28 +-
.../hadoop/hbase/backup/impl/BackupManager.java | 16 +
.../hbase/backup/impl/BackupSystemTable.java | 393 ++++++++++++++++++-
.../impl/IncrementalTableBackupClient.java | 124 ++++++
.../hbase/backup/impl/RestoreTablesClient.java | 96 +++--
.../backup/mapreduce/MapReduceRestoreJob.java | 6 +-
.../hbase/mapreduce/LoadIncrementalHFiles.java | 20 +-
.../hadoop/hbase/util/HFileArchiveUtil.java | 16 +
.../hadoop/hbase/backup/TestBackupBase.java | 15 +
.../hbase/backup/TestBackupHFileCleaner.java | 141 +++++++
.../TestIncrementalBackupWithBulkLoad.java | 145 +++++++
.../mapreduce/TestLoadIncrementalHFiles.java | 46 ++-
14 files changed, 1275 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
new file mode 100644
index 0000000..b6b4c0a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Implementation of a file cleaner that checks if an hfile is still referenced by backup before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+ private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class);
+ private boolean stopped = false;
+ private boolean aborted;
+ private Configuration conf;
+ private Connection connection;
+ private long prevReadFromBackupTbl = 0, // timestamp of most recent read from hbase:backup table
+ secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from hbase:backup table
+ //used by unit test to skip reading hbase:backup
+ private boolean checkForFullyBackedUpTables = true;
+ private List<TableName> fullyBackedUpTables = null;
+
+ private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
+ Set<String> filenames = new HashSet<String>();
+ for (Map<byte[], List<Path>> map : maps) {
+ if (map == null) continue;
+ for (List<Path> paths : map.values()) {
+ for (Path p : paths) {
+ filenames.add(p.getName());
+ }
+ }
+ }
+ return filenames;
+ }
+
+ private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
+ if (connection == null) {
+ connection = ConnectionFactory.createConnection(conf);
+ }
+ try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
+ Map<byte[], List<Path>>[] res =
+ tbl.readBulkLoadedFiles(null, tableList);
+ secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
+ prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
+ return getFilenameFromBulkLoad(res);
+ }
+ }
+
+ @VisibleForTesting
+ void setCheckForFullyBackedUpTables(boolean b) {
+ checkForFullyBackedUpTables = b;
+ }
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ if (conf == null) {
+ return files;
+ }
+ // obtain the Set of TableName's which have been fully backed up
+ // so that we filter BulkLoad to be returned from server
+ if (checkForFullyBackedUpTables) {
+ if (connection == null) return files;
+ try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
+ fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+ } catch (IOException ioe) {
+ LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
+ return Collections.emptyList();
+ }
+ Collections.sort(fullyBackedUpTables);
+ }
+ final Set<String> hfileRefs;
+ try {
+ hfileRefs = loadHFileRefs(fullyBackedUpTables);
+ } catch (IOException ioe) {
+ LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
+ return Collections.emptyList();
+ }
+ Iterable<FileStatus> deletables = Iterables.filter(files, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus file) {
+ // If the file is recent, be conservative and wait for one more scan of hbase:backup table
+ if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
+ return false;
+ }
+ String hfile = file.getPath().getName();
+ boolean foundHFileRef = hfileRefs.contains(hfile);
+ return !foundHFileRef;
+ }
+ });
+ return deletables;
+ }
+
+ @Override
+ public boolean isFileDeletable(FileStatus fStat) {
+ // work is done in getDeletableFiles()
+ return true;
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ this.conf = config;
+ this.connection = null;
+ try {
+ this.connection = ConnectionFactory.createConnection(conf);
+ } catch (IOException ioe) {
+ LOG.error("Couldn't establish connection", ioe);
+ }
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (IOException ioe) {
+ LOG.debug("Got " + ioe + " when closing connection");
+ }
+ }
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+ this.aborted = true;
+ stop(why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
new file mode 100644
index 0000000..595e862
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An Observer to facilitate backup operations
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupObserver implements RegionObserver {
+ private static final Log LOG = LogFactory.getLog(BackupObserver.class);
+ @Override
+ public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
+ boolean hasLoaded) throws IOException {
+ Configuration cfg = ctx.getEnvironment().getConfiguration();
+ if (!hasLoaded) {
+ // there is no need to record state
+ return hasLoaded;
+ }
+ if (finalPaths == null || !BackupManager.isBackupEnabled(cfg)) {
+ LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
+ return hasLoaded;
+ }
+ try (Connection connection = ConnectionFactory.createConnection(cfg);
+ BackupSystemTable tbl = new BackupSystemTable(connection)) {
+ List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+ HRegionInfo info = ctx.getEnvironment().getRegionInfo();
+ TableName tableName = info.getTable();
+ if (!fullyBackedUpTables.contains(tableName)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(tableName + " has not gone thru full backup");
+ }
+ return hasLoaded;
+ }
+ tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
+ return hasLoaded;
+ } catch (IOException ioe) {
+ LOG.error("Failed to get tables which have been fully backed up", ioe);
+ return false;
+ }
+ }
+ @Override
+ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+ Configuration cfg = ctx.getEnvironment().getConfiguration();
+ if (pairs == null || pairs.isEmpty() || !BackupManager.isBackupEnabled(cfg)) {
+ LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
+ return;
+ }
+ try (Connection connection = ConnectionFactory.createConnection(cfg);
+ BackupSystemTable tbl = new BackupSystemTable(connection)) {
+ List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+ HRegionInfo info = ctx.getEnvironment().getRegionInfo();
+ TableName tableName = info.getTable();
+ if (!fullyBackedUpTables.contains(tableName)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(tableName + " has not gone thru full backup");
+ }
+ return;
+ }
+ tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
+ return;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index c1d5258..eb60860 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -188,7 +188,33 @@ public class BackupAdminImpl implements BackupAdmin {
removeTableFromBackupImage(info, tn, sysTable);
}
}
- LOG.debug("Delete backup info " + backupInfo.getBackupId());
+ Map<byte[], String> map = sysTable.readBulkLoadedFiles(backupId);
+ FileSystem fs = FileSystem.get(conn.getConfiguration());
+ boolean succ = true;
+ int numDeleted = 0;
+ for (String f : map.values()) {
+ Path p = new Path(f);
+ try {
+ LOG.debug("Delete backup info " + p + " for " + backupInfo.getBackupId());
+ if (!fs.delete(p)) {
+ if (fs.exists(p)) {
+ LOG.warn(f + " was not deleted");
+ succ = false;
+ }
+ } else {
+ numDeleted++;
+ }
+ } catch (IOException ioe) {
+ LOG.warn(f + " was not deleted", ioe);
+ succ = false;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
+ }
+ if (succ) {
+ sysTable.deleteBulkLoadedFiles(map);
+ }
sysTable.deleteBackupInfo(backupInfo.getBackupId());
LOG.info("Delete backup " + backupInfo.getBackupId() + " completed.");
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 1d27e79..c09ce48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
+import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
@@ -393,6 +395,20 @@ public class BackupManager implements Closeable {
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}
+ public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
+ readBulkloadRows(List<TableName> tableList) throws IOException {
+ return systemTable.readBulkloadRows(tableList);
+ }
+
+ public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
+ systemTable.removeBulkLoadedRows(lst, rows);
+ }
+
+ public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
+ throws IOException {
+ systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
+ }
+
/**
* Get all completed backup information (in desc order by time)
* @return history info of BackupCompleteData
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 6362f8e..1ba8087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -44,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
@@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
/**
* This class provides API to access backup system table<br>
@@ -77,6 +82,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/
@InterfaceAudience.Private
public final class BackupSystemTable implements Closeable {
+ private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
static class WALItem {
String backupId;
@@ -108,8 +114,6 @@ public final class BackupSystemTable implements Closeable {
}
- private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
-
private TableName tableName;
/**
* Stores backup sessions (contexts)
@@ -119,6 +123,7 @@ public final class BackupSystemTable implements Closeable {
* Stores other meta
*/
final static byte[] META_FAMILY = "meta".getBytes();
+ final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
/**
* Connection to HBase cluster, shared among all instances
*/
@@ -130,9 +135,22 @@ public final class BackupSystemTable implements Closeable {
private final static String INCR_BACKUP_SET = "incrbackupset:";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:";
+
+ private final static String BULK_LOAD_PREFIX = "bulk:";
+ private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
+ final static byte[] TBL_COL = Bytes.toBytes("tbl");
+ final static byte[] FAM_COL = Bytes.toBytes("fam");
+ final static byte[] PATH_COL = Bytes.toBytes("path");
+ final static byte[] STATE_COL = Bytes.toBytes("state");
+ // the two states a bulk loaded file can be
+ final static byte[] BL_PREPARE = Bytes.toBytes("R");
+ final static byte[] BL_COMMIT = Bytes.toBytes("D");
+
private final static String WALS_PREFIX = "wals:";
private final static String SET_KEY_PREFIX = "backupset:";
+ // separator between BULK_LOAD_PREFIX and ordinals
+ protected final static String BLK_LD_DELIM = ":";
private final static byte[] EMPTY_VALUE = new byte[] {};
// Safe delimiter in a string
@@ -196,6 +214,97 @@ public final class BackupSystemTable implements Closeable {
}
}
+ /*
+ * @param backupId the backup Id
+ * @return Map of rows to path of bulk loaded hfile
+ */
+ Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
+ Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ byte[] row = CellUtil.cloneRow(res.listCells().get(0));
+ for (Cell cell : res.listCells()) {
+ if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+ BackupSystemTable.PATH_COL.length) == 0) {
+ map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ }
+ return map;
+ }
+ }
+
+ /*
+ * Used during restore
+ * @param backupId the backup Id
+ * @param sTableList List of tables
+ * @return array of Map of family to List of Paths
+ */
+ public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
+ throws IOException {
+ Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
+ Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ TableName tbl = null;
+ byte[] fam = null;
+ String path = null;
+ for (Cell cell : res.listCells()) {
+ if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
+ BackupSystemTable.TBL_COL.length) == 0) {
+ tbl = TableName.valueOf(CellUtil.cloneValue(cell));
+ } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
+ BackupSystemTable.FAM_COL.length) == 0) {
+ fam = CellUtil.cloneValue(cell);
+ } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+ BackupSystemTable.PATH_COL.length) == 0) {
+ path = Bytes.toString(CellUtil.cloneValue(cell));
+ }
+ }
+ int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
+ if (srcIdx == -1) {
+ // the table is not among the query
+ continue;
+ }
+ if (mapForSrc[srcIdx] == null) {
+ mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ }
+ List<Path> files;
+ if (!mapForSrc[srcIdx].containsKey(fam)) {
+ files = new ArrayList<Path>();
+ mapForSrc[srcIdx].put(fam, files);
+ } else {
+ files = mapForSrc[srcIdx].get(fam);
+ }
+ files.add(new Path(path));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
+ }
+ };
+ return mapForSrc;
+ }
+ }
+
+ /*
+ * @param map Map of row keys to path of bulk loaded hfile
+ */
+ void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
+ try (Table table = connection.getTable(tableName)) {
+ List<Delete> dels = new ArrayList<>();
+ for (byte[] row : map.keySet()) {
+ dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
+ }
+ table.delete(dels);
+ }
+ }
+
/**
* Deletes backup status from backup system table table
* @param backupId backup id
@@ -213,6 +322,156 @@ public final class BackupSystemTable implements Closeable {
}
}
+ /*
+ * For postBulkLoadHFile() hook.
+ * @param tabName table name
+ * @param region the region receiving hfile
+ * @param finalPaths family and associated hfiles
+ */
+ public void writePathsPostBulkLoad(TableName tabName, byte[] region,
+ Map<byte[], List<Path>> finalPaths) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
+ finalPaths.size() + " entries");
+ }
+ try (Table table = connection.getTable(tableName)) {
+ List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
+ finalPaths);
+ table.put(puts);
+ LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
+ }
+ }
+ /*
+ * For preCommitStoreFile() hook
+ * @param tabName table name
+ * @param region the region receiving hfile
+ * @param family column family
+ * @param pairs list of paths for hfiles
+ */
+ public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
+ final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
+ pairs.size() + " entries");
+ }
+ try (Table table = connection.getTable(tableName)) {
+ List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
+ family, pairs);
+ table.put(puts);
+ LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
+ }
+ }
+
+ /*
+ * Removes rows recording bulk loaded hfiles from backup table
+ * @param lst list of table names
+ * @param rows the rows to be deleted
+ */
+ public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
+ try (Table table = connection.getTable(tableName)) {
+ List<Delete> lstDels = new ArrayList<>();
+ for (byte[] row : rows) {
+ Delete del = new Delete(row);
+ lstDels.add(del);
+ LOG.debug("orig deleting the row: " + Bytes.toString(row));
+ }
+ table.delete(lstDels);
+ LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
+ }
+ }
+
+ /*
+ * Reads the rows from backup table recording bulk loaded hfiles
+ * @param tableList list of table names
+ * @return The keys of the Map are table, region and column family.
+ * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
+ */
+ public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
+ readBulkloadRows(List<TableName> tableList) throws IOException {
+ Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
+ List<byte[]> rows = new ArrayList<>();
+ for (TableName tTable : tableList) {
+ Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
+ Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
+ try (Table table = connection.getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ String fam = null;
+ String path = null;
+ boolean raw = false;
+ byte[] row = null;
+ String region = null;
+ for (Cell cell : res.listCells()) {
+ row = CellUtil.cloneRow(cell);
+ rows.add(row);
+ String rowStr = Bytes.toString(row);
+ region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
+ if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
+ BackupSystemTable.FAM_COL.length) == 0) {
+ fam = Bytes.toString(CellUtil.cloneValue(cell));
+ } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
+ BackupSystemTable.PATH_COL.length) == 0) {
+ path = Bytes.toString(CellUtil.cloneValue(cell));
+ } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
+ BackupSystemTable.STATE_COL.length) == 0) {
+ byte[] state = CellUtil.cloneValue(cell);
+ if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
+ raw = true;
+ } else raw = false;
+ }
+ }
+ if (map.get(tTable) == null) {
+ map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
+ tblMap = map.get(tTable);
+ }
+ if (tblMap.get(region) == null) {
+ tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
+ }
+ Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
+ if (famMap.get(fam) == null) {
+ famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
+ }
+ famMap.get(fam).add(new Pair<>(path, raw));
+ LOG.debug("found orig " + path + " for " + fam + " of table " + region);
+ }
+ }
+ }
+ return new Pair<>(map, rows);
+ }
+
+ /*
+ * @param sTableList List of tables
+ * @param maps array of Map of family to List of Paths
+ * @param backupId the backup Id
+ */
+ public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
+ String backupId) throws IOException {
+ try (Table table = connection.getTable(tableName)) {
+ long ts = EnvironmentEdgeManager.currentTime();
+ int cnt = 0;
+ List<Put> puts = new ArrayList<>();
+ for (int idx = 0; idx < maps.length; idx++) {
+ Map<byte[], List<Path>> map = maps[idx];
+ TableName tn = sTableList.get(idx);
+ if (map == null) continue;
+ for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
+ byte[] fam = entry.getKey();
+ List<Path> paths = entry.getValue();
+ for (Path p : paths) {
+ Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
+ backupId, ts, cnt++);
+ puts.add(put);
+ }
+ }
+ }
+ if (!puts.isEmpty()) {
+ table.put(puts);
+ }
+ }
+ }
+
/**
* Reads backup status object (instance of backup info) from backup system table table
* @param backupId backup id
@@ -399,6 +658,21 @@ public final class BackupSystemTable implements Closeable {
}
+ /*
+ * Retrieve TableName's for completed backup of given type
+ * @param type backup type
+ * @return List of table names
+ */
+ public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
+ Set<TableName> names = new HashSet<>();
+ List<BackupInfo> infos = getBackupHistory(true);
+ for (BackupInfo info : infos) {
+ if (info.getType() != type) continue;
+ names.addAll(info.getTableNames());
+ }
+ return new ArrayList(names);
+ }
+
/**
* Get history for backup destination
* @param backupRoot backup destination path
@@ -1233,6 +1507,119 @@ public final class BackupSystemTable implements Closeable {
return s.substring(index + 1);
}
+ /*
+ * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+ */
+ static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
+ Map<byte[], List<Path>> finalPaths) {
+ List<Put> puts = new ArrayList<>();
+ for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
+ for (Path path : entry.getValue()) {
+ String file = path.toString();
+ int lastSlash = file.lastIndexOf("/");
+ String filename = file.substring(lastSlash+1);
+ Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+ Bytes.toString(region), BLK_LD_DELIM, filename));
+ put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+ put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
+ put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
+ file.getBytes());
+ put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
+ puts.add(put);
+ LOG.debug("writing done bulk path " + file + " for " + table + " " +
+ Bytes.toString(region));
+ }
+ }
+ return puts;
+ }
+
+ /*
+ * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+ */
+ static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
+ final byte[] family, final List<Pair<Path, Path>> pairs) {
+ List<Put> puts = new ArrayList<>();
+ for (Pair<Path, Path> pair : pairs) {
+ Path path = pair.getSecond();
+ String file = path.toString();
+ int lastSlash = file.lastIndexOf("/");
+ String filename = file.substring(lastSlash+1);
+ Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
+ Bytes.toString(region), BLK_LD_DELIM, filename));
+ put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
+ put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
+ put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
+ file.getBytes());
+ put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
+ puts.add(put);
+ LOG.debug("writing raw bulk path " + file + " for " + table + " " +
+ Bytes.toString(region));
+ }
+ return puts;
+ }
+ public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
+ List<Delete> lstDels = new ArrayList<>();
+ for (TableName table : lst) {
+ Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
+ del.addFamily(BackupSystemTable.META_FAMILY);
+ lstDels.add(del);
+ }
+ return lstDels;
+ }
+
+ static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
+ Scan scan = new Scan();
+ byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.withStartRow(startRow);
+ scan.withStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ scan.setMaxVersions(1);
+ return scan;
+ }
+ static String getTableNameFromOrigBulkLoadRow(String rowStr) {
+ String[] parts = rowStr.split(BLK_LD_DELIM);
+ return parts[1];
+ }
+ static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
+ // format is bulk : namespace : table : region : file
+ String[] parts = rowStr.split(BLK_LD_DELIM);
+ int idx = 3;
+ if (parts.length == 4) {
+ // the table is in default namespace
+ idx = 2;
+ }
+ LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
+ return parts[idx];
+ }
+ /*
+ * Used to query bulk loaded hfiles which have been copied by incremental backup
+ * @param backupId the backup Id. It can be null when querying for all tables
+ * @return the Scan object
+ */
+ static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
+ Scan scan = new Scan();
+ byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
+ rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ //scan.setTimeRange(lower, Long.MAX_VALUE);
+ scan.addFamily(BackupSystemTable.META_FAMILY);
+ scan.setMaxVersions(1);
+ return scan;
+ }
+
+ static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
+ long ts, int idx) {
+ Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
+ put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
+ put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
+ put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
+ return put;
+ }
/**
* Creates put list for list of WAL files
* @param files list of WAL file paths
@@ -1364,7 +1751,7 @@ public final class BackupSystemTable implements Closeable {
return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
}
- private byte[] rowkey(String s, String... other) {
+ private static byte[] rowkey(String s, String... other) {
StringBuilder sb = new StringBuilder(s);
for (String ss : other) {
sb.append(ss);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 395ed6d..8f6f264 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -18,15 +18,21 @@
package org.apache.hadoop.hbase.backup.impl;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -40,6 +46,10 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Incremental backup implementation.
@@ -154,6 +164,118 @@ public class IncrementalTableBackupClient extends TableBackupClient {
return list;
}
+ static int getIndex(TableName tbl, List<TableName> sTableList) {
+ if (sTableList == null) return 0;
+ for (int i = 0; i < sTableList.size(); i++) {
+ if (tbl.equals(sTableList.get(i))) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /*
+ * Reads bulk load records from backup table, iterates through the records and forms the paths
+ * for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
+ * @param sTableList list of tables to be backed up
+ * @return map of table to List of files
+ */
+ Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
+ Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
+ Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
+ backupManager.readBulkloadRows(sTableList);
+ Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem tgtFs;
+ try {
+ tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
+ } catch (URISyntaxException use) {
+ throw new IOException("Unable to get FileSystem", use);
+ }
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
+ for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
+ map.entrySet()) {
+ TableName srcTable = tblEntry.getKey();
+ int srcIdx = getIndex(srcTable, sTableList);
+ if (srcIdx < 0) {
+ LOG.warn("Couldn't find " + srcTable + " in source table List");
+ continue;
+ }
+ if (mapForSrc[srcIdx] == null) {
+ mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
+ }
+ Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
+ Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
+ srcTable.getQualifierAsString());
+ for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
+ tblEntry.getValue().entrySet()){
+ String regionName = regionEntry.getKey();
+ Path regionDir = new Path(tblDir, regionName);
+ // map from family to List of hfiles
+ for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
+ regionEntry.getValue().entrySet()) {
+ String fam = famEntry.getKey();
+ Path famDir = new Path(regionDir, fam);
+ List<Path> files;
+ if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
+ files = new ArrayList<Path>();
+ mapForSrc[srcIdx].put(fam.getBytes(), files);
+ } else {
+ files = mapForSrc[srcIdx].get(fam.getBytes());
+ }
+ Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
+ String tblName = srcTable.getQualifierAsString();
+ Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
+ if (!tgtFs.mkdirs(tgtFam)) {
+ throw new IOException("couldn't create " + tgtFam);
+ }
+ for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
+ String file = fileWithState.getFirst();
+ boolean raw = fileWithState.getSecond();
+ int idx = file.lastIndexOf("/");
+ String filename = file;
+ if (idx > 0) {
+ filename = file.substring(idx+1);
+ }
+ Path p = new Path(famDir, filename);
+ Path tgt = new Path(tgtFam, filename);
+ Path archive = new Path(archiveDir, filename);
+ if (fs.exists(p)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
+ }
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("copying " + p + " to " + tgt);
+ }
+ FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
+ } catch (FileNotFoundException e) {
+ LOG.debug("copying archive " + archive + " to " + tgt);
+ try {
+ FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+ } catch (FileNotFoundException fnfe) {
+ if (!raw) throw fnfe;
+ }
+ }
+ } else {
+ LOG.debug("copying archive " + archive + " to " + tgt);
+ try {
+ FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
+ } catch (FileNotFoundException fnfe) {
+ if (!raw) throw fnfe;
+ }
+ }
+ files.add(tgt);
+ }
+ }
+ }
+ }
+ backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
+ backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
+ return mapForSrc;
+ }
+
@Override
public void execute() throws IOException {
@@ -204,6 +326,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
BackupUtils.getMinValue(BackupUtils
.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
+
+ handleBulkLoad(backupInfo.getTableNames());
// backup complete
completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index f418305..2e4ecce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -19,9 +19,14 @@
package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
@@ -34,10 +39,13 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
/**
* Restore table implementation
@@ -50,6 +58,7 @@ public class RestoreTablesClient {
private Configuration conf;
private Connection conn;
private String backupId;
+ private String fullBackupId;
private TableName[] sTableArray;
private TableName[] tTableArray;
private String targetRootDir;
@@ -141,6 +150,7 @@ public class RestoreTablesClient {
// We need hFS only for full restore (see the code)
BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
if (manifest.getType() == BackupType.FULL) {
+ fullBackupId = manifest.getBackupImage().getBackupId();
LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+ tableBackupPath.toString());
restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@@ -170,7 +180,6 @@ public class RestoreTablesClient {
restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
new TableName[] { tTable }, lastIncrBackupId);
LOG.info(sTable + " has been successfully restored to " + tTable);
-
}
/**
@@ -185,39 +194,74 @@ public class RestoreTablesClient {
TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
boolean truncateIfExists = isOverwrite;
- try {
- for (int i = 0; i < sTableArray.length; i++) {
- TableName table = sTableArray[i];
- BackupManifest manifest = backupManifestMap.get(table);
- // Get the image list of this backup for restore in time order from old
- // to new.
- List<BackupImage> list = new ArrayList<BackupImage>();
- list.add(manifest.getBackupImage());
- TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
- List<BackupImage> depList = manifest.getDependentListByTable(table);
- set.addAll(depList);
- BackupImage[] arr = new BackupImage[set.size()];
- set.toArray(arr);
- restoreImages(arr, table, tTableArray[i], truncateIfExists);
- restoreImageSet.addAll(list);
- if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
- LOG.info("Restore includes the following image(s):");
- for (BackupImage image : restoreImageSet) {
- LOG.info("Backup: "
- + image.getBackupId()
- + " "
- + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+ Set<String> backupIdSet = new HashSet<>();
+ for (int i = 0; i < sTableArray.length; i++) {
+ TableName table = sTableArray[i];
+ BackupManifest manifest = backupManifestMap.get(table);
+ // Get the image list of this backup for restore in time order from old
+ // to new.
+ List<BackupImage> list = new ArrayList<BackupImage>();
+ list.add(manifest.getBackupImage());
+ TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+ List<BackupImage> depList = manifest.getDependentListByTable(table);
+ set.addAll(depList);
+ BackupImage[] arr = new BackupImage[set.size()];
+ set.toArray(arr);
+ restoreImages(arr, table, tTableArray[i], truncateIfExists);
+ restoreImageSet.addAll(list);
+ if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+ LOG.info("Restore includes the following image(s):");
+ for (BackupImage image : restoreImageSet) {
+ LOG.info("Backup: "
+ + image.getBackupId()
+ + " "
+ + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
table));
+ if (image.getType() == BackupType.INCREMENTAL) {
+ backupIdSet.add(image.getBackupId());
+ LOG.debug("adding " + image.getBackupId() + " for bulk load");
+ }
+ }
+ }
+ }
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ List<TableName> sTableList = Arrays.asList(sTableArray);
+ for (String id : backupIdSet) {
+ LOG.debug("restoring bulk load for " + id);
+ Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
+ Map<LoadQueueItem, ByteBuffer> loaderResult;
+ conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+ LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
+ for (int i = 0; i < sTableList.size(); i++) {
+ if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
+ loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
+ LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
+ if (loaderResult.isEmpty()) {
+ String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
}
}
}
- } catch (Exception e) {
- LOG.error("Failed", e);
- throw new IOException(e);
}
LOG.debug("restoreStage finished");
}
+ static long getTsFromBackupId(String backupId) {
+ if (backupId == null) {
+ return 0;
+ }
+ return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
+ }
+
+ static boolean withinRange(long a, long lower, long upper) {
+ if (a < lower || a > upper) {
+ return false;
+ }
+ return true;
+ }
+
public void execute() throws IOException {
// case VALIDATION:
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index ffb61ec..9bafe12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -98,7 +98,7 @@ public class MapReduceRestoreJob implements RestoreJob {
result = player.run(playerArgs);
if (succeeded(result)) {
// do bulk load
- LoadIncrementalHFiles loader = createLoader();
+ LoadIncrementalHFiles loader = createLoader(getConf());
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
}
@@ -134,13 +134,13 @@ public class MapReduceRestoreJob implements RestoreJob {
return result == 0;
}
- private LoadIncrementalHFiles createLoader() throws IOException {
+ public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
// set configuration for restore:
// LoadIncrementalHFile needs more time
// <name>hbase.rpc.timeout</name> <value>600000</value>
// calculates
Integer milliSecInHour = 3600000;
- Configuration conf = new Configuration(getConf());
+ Configuration conf = new Configuration(config);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
// By default, it is 32 and loader will fail if # of files in any region exceed this
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index f59e24c..80dfd66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -62,6 +63,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
@@ -144,7 +148,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
initialize();
}
- private void initialize() throws Exception {
+ private void initialize() throws IOException {
if (initalized) {
return;
}
@@ -282,6 +286,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public String toString() {
return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
}
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public Path getFilePath() {
+ return hfilePath;
+ }
}
/*
@@ -1184,7 +1196,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* If the table is created for the first time, then "completebulkload" reads the files twice.
* More modifications necessary if we want to avoid doing it.
*/
- private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception {
+ private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
final Path hfofDir = new Path(dirPath);
final FileSystem fs = hfofDir.getFileSystem(getConf());
@@ -1238,7 +1250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
- TableName tableName) throws Exception{
+ TableName tableName) throws IOException {
initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) {
@@ -1261,7 +1273,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
- boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
+ boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
index faae4ef..8e3e105 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
@@ -167,4 +167,20 @@ public class HFileArchiveUtil {
private static Path getArchivePath(final Path rootdir) {
return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY);
}
+
+ /*
+ * @return table name given archive file path
+ */
+ public static TableName getTableName(Path archivePath) {
+ Path p = archivePath;
+ String tbl = null;
+ // namespace is the 4th parent of file
+ for (int i = 0; i < 5; i++) {
+ if (p == null) return null;
+ if (i == 3) tbl = p.getName();
+ p = p.getParent();
+ }
+ if (p == null) return null;
+ return TableName.valueOf(p.getName(), tbl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index ec88549..e6bd73e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -49,6 +49,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -88,6 +92,7 @@ public class TestBackupBase {
protected static String BACKUP_ROOT_DIR = "/backupUT";
protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
protected static String provider = "defaultProvider";
+ protected static boolean secure = false;
/**
* @throws java.lang.Exception
@@ -96,6 +101,16 @@ public class TestBackupBase {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf1 = TEST_UTIL.getConfiguration();
+ if (secure) {
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+ // setup configuration
+ SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+ }
+ String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+ conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
+ BackupObserver.class.getName());
conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
BackupManager.decorateMasterConfiguration(conf1);
BackupManager.decorateRegionServerConfiguration(conf1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
new file mode 100644
index 0000000..dfbe106
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestBackupHFileCleaner {
+ private static final Log LOG = LogFactory.getLog(TestBackupHFileCleaner.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Configuration conf = TEST_UTIL.getConfiguration();
+ private static TableName tableName = TableName.valueOf("backup.hfile.cleaner");
+ private static String famName = "fam";
+ static FileSystem fs = null;
+ Path root;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+ TEST_UTIL.startMiniZKCluster();
+ TEST_UTIL.startMiniCluster(1);
+ fs = FileSystem.get(conf);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ }
+
+ @After
+ public void cleanup() {
+ try {
+ fs.delete(root, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete files recursively from path " + root);
+ }
+ }
+
+ @Test
+ public void testGetDeletableFiles() throws IOException {
+ // 1. Create a file
+ Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+ fs.createNewFile(file);
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+ BackupHFileCleaner cleaner = new BackupHFileCleaner();
+ cleaner.setConf(conf);
+ cleaner.setCheckForFullyBackedUpTables(false);
+ // 3. Assert that file as is should be deletable
+ List<FileStatus> stats = new ArrayList<>();
+ FileStatus stat = fs.getFileStatus(file);
+ stats.add(stat);
+ Iterable<FileStatus> deletable = cleaner.getDeletableFiles(stats);
+ deletable = cleaner.getDeletableFiles(stats);
+ boolean found = false;
+ for (FileStatus stat1 : deletable) {
+ if (stat.equals(stat1)) found = true;
+ }
+ assertTrue("Cleaner should allow to delete this file as there is no hfile reference "
+ + "for it.", found);
+
+ // 4. Add the file as bulk load
+ List<Path> list = new ArrayList<>(1);
+ list.add(file);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ BackupSystemTable sysTbl = new BackupSystemTable(conn)) {
+ List<TableName> sTableList = new ArrayList<>();
+ sTableList.add(tableName);
+ Map<byte[], List<Path>>[] maps = new Map[1];
+ maps[0] = new HashMap<>();
+ maps[0].put(famName.getBytes(), list);
+ sysTbl.writeBulkLoadedFiles(sTableList, maps, "1");
+ }
+
+ // 5. Assert file should not be deletable
+ deletable = cleaner.getDeletableFiles(stats);
+ deletable = cleaner.getDeletableFiles(stats);
+ found = false;
+ for (FileStatus stat1 : deletable) {
+ if (stat.equals(stat1)) found = true;
+ }
+ assertFalse("Cleaner should not allow to delete this file as there is a hfile reference "
+ + "for it.", found);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
new file mode 100644
index 0000000..c10ec40
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 1. Create table t1
+ * 2. Load data to t1
+ * 3 Full backup t1
+ * 4 Load data to t1
+ * 5 bulk load into t1
+ * 6 Incremental backup t1
+ */
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
+ private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class);
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ secure = true;
+ List<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[] {Boolean.TRUE});
+ return params;
+ }
+
+ public TestIncrementalBackupWithBulkLoad(Boolean b) {
+ }
+ // implement all test cases in 1 test since incremental backup/restore has dependencies
+ @Test
+ public void TestIncBackupDeleteTable() throws Exception {
+ String testName = "TestIncBackupDeleteTable";
+ // #1 - create full backup for all tables
+ LOG.info("create full backup image for all tables");
+
+ List<TableName> tables = Lists.newArrayList(table1);
+ HBaseAdmin admin = null;
+ Connection conn = ConnectionFactory.createConnection(conf1);
+ admin = (HBaseAdmin) conn.getAdmin();
+ BackupAdminImpl client = new BackupAdminImpl(conn);
+
+ BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+ String backupIdFull = client.backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdFull));
+
+ // #2 - insert some data to table table1
+ HTable t1 = (HTable) conn.getTable(table1);
+ Put p1;
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p1 = new Put(Bytes.toBytes("row-t1" + i));
+ p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ t1.put(p1);
+ }
+
+ Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH * 2);
+ t1.close();
+
+ int NB_ROWS2 = 20;
+ LOG.debug("bulk loading into " + testName);
+ int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+ qualName, false, null, new byte[][][] {
+ new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+ }, true, false, true, NB_ROWS_IN_BATCH*2, NB_ROWS2);
+
+ // #3 - incremental backup for table1
+ tables = Lists.newArrayList(table1);
+ request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
+ String backupIdIncMultiple = client.backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple));
+
+ // #5.1 - check tables for full restore */
+ HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
+
+ // #6 - restore incremental backup for table1
+ TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
+ TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
+ false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+ HTable hTable = (HTable) conn.getTable(table1_restore);
+ Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
+ request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+
+ backupIdFull = client.backupTables(request);
+ try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+ Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair
+ = table.readBulkloadRows(tables);
+ assertTrue("map still has " + pair.getSecond().size() + " entries",
+ pair.getSecond().isEmpty());
+ }
+ assertTrue(checkSucceeded(backupIdFull));
+
+ hTable.close();
+ admin.close();
+ conn.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0345fc87/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index a6dacf7..7ae5afc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -308,13 +308,14 @@ public class TestLoadIncrementalHFiles {
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
}
- private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
- boolean copyFiles) throws Exception {
+ public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
+ byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
+ byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
+ boolean copyFiles, int initRowCount, int factor) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+ Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0;
Map<byte[], List<Path>> map = null;
@@ -324,26 +325,26 @@ public class TestLoadIncrementalHFiles {
}
if (useMap) {
map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- map.put(FAMILY, list);
+ map.put(fam, list);
}
Path last = null;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
Path path = new Path(familyDir, "hfile_" + hfileIdx++);
- HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
if (useMap) {
last = path;
list.add(path);
}
}
- int expectedRows = hfileIdx * 1000;
+ int expectedRows = hfileIdx * factor;
- if (preCreateTable || map != null) {
+ final TableName tableName = htd.getTableName();
+ if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
util.getAdmin().createTable(htd, tableSplitKeys);
}
- final TableName tableName = htd.getTableName();
Configuration conf = util.getConfiguration();
if (copyFiles) {
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
@@ -351,12 +352,14 @@ public class TestLoadIncrementalHFiles {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()};
if (useMap) {
- fs.delete(last);
+ if (deleteFile) fs.delete(last);
Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
- expectedRows -= 1000;
- for (LoadQueueItem item : loaded.keySet()) {
- if (item.hfilePath.getName().equals(last.getName())) {
- fail(last + " should be missing");
+ if (deleteFile) {
+ expectedRows -= 1000;
+ for (LoadQueueItem item : loaded.keySet()) {
+ if (item.hfilePath.getName().equals(last.getName())) {
+ fail(last + " should be missing");
+ }
}
}
} else {
@@ -365,19 +368,30 @@ public class TestLoadIncrementalHFiles {
if (copyFiles) {
for (Path p : list) {
- assertTrue(fs.exists(p));
+ assertTrue(p + " should exist", fs.exists(p));
}
}
Table table = util.getConnection().getTable(tableName);
try {
- assertEquals(expectedRows, util.countRows(table));
+ assertEquals(initRowCount + expectedRows, util.countRows(table));
} finally {
table.close();
}
+ return expectedRows;
+ }
+
+ private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+ boolean copyFiles) throws Exception {
+ loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
+ hfileRanges, useMap, true, copyFiles, 0, 1000);
+
+ final TableName tableName = htd.getTableName();
// verify staging folder has been cleaned up
Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+ FileSystem fs = util.getTestFileSystem();
if(fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
for(FileStatus file : files) {