You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:46:09 UTC
[33/47] hbase git commit: HBASE-21642 CopyTable by reading snapshot
and bulkloading will save a lot of time
HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c2d5991b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c2d5991b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c2d5991b
Branch: refs/heads/HBASE-21512
Commit: c2d5991b82e3b807cb11f5735ef5068b73720725
Parents: c552088
Author: huzheng <op...@gmail.com>
Authored: Wed Dec 26 16:17:55 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 27 18:22:54 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/mapreduce/CopyTable.java | 109 ++++++++++++------
.../hadoop/hbase/mapreduce/TestCopyTable.java | 110 ++++++++++++++++---
.../hbase/client/ClientSideRegionScanner.java | 14 ++-
.../hadoop/hbase/regionserver/HRegion.java | 2 +-
4 files changed, 187 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 4e57f54..b59c9e6 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
+import java.util.UUID;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
+import org.apache.hadoop.hbase.mapreduce.Import.Importer;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -70,8 +72,34 @@ public class CopyTable extends Configured implements Tool {
boolean bulkload = false;
Path bulkloadDir = null;
+ boolean readingSnapshot = false;
+ String snapshot = null;
+
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
+ FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
+ Path dir = new Path(fs.getWorkingDirectory(), NAME);
+ if (!fs.exists(dir)) {
+ fs.mkdirs(dir);
+ }
+ Path newDir = new Path(dir, UUID.randomUUID().toString());
+ if (withDirCreated) {
+ fs.mkdirs(newDir);
+ }
+ return newDir;
+ }
+
+ private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
+ Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
+ if (readingSnapshot) {
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
+ generateUniqTempDir(true));
+ } else {
+ TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
+ }
+ }
+
/**
* Sets up the actual job.
*
@@ -79,13 +107,13 @@ public class CopyTable extends Configured implements Tool {
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
- public Job createSubmittableJob(String[] args)
- throws IOException {
+ public Job createSubmittableJob(String[] args) throws IOException {
if (!doCommandLine(args)) {
return null;
}
- Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+ String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
+ Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
job.setJarByClass(CopyTable.class);
Scan scan = new Scan();
@@ -107,15 +135,15 @@ public class CopyTable extends Configured implements Tool {
job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
}
if (versions >= 0) {
- scan.setMaxVersions(versions);
+ scan.readVersions(versions);
}
if (startRow != null) {
- scan.setStartRow(Bytes.toBytesBinary(startRow));
+ scan.withStartRow(Bytes.toBytesBinary(startRow));
}
if (stopRow != null) {
- scan.setStopRow(Bytes.toBytesBinary(stopRow));
+ scan.withStopRow(Bytes.toBytesBinary(stopRow));
}
if(families != null) {
@@ -140,24 +168,13 @@ public class CopyTable extends Configured implements Tool {
job.setNumReduceTasks(0);
if (bulkload) {
- TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null,
- job);
+ initCopyTableMapperReducerJob(job, scan);
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
- FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
- Random rand = new Random();
- Path root = new Path(fs.getWorkingDirectory(), "copytable");
- fs.mkdirs(root);
- while (true) {
- bulkloadDir = new Path(root, "" + rand.nextLong());
- if (!fs.exists(bulkloadDir)) {
- break;
- }
- }
-
- System.out.println("HFiles will be stored at " + this.bulkloadDir);
+ bulkloadDir = generateUniqTempDir(false);
+ LOG.info("HFiles will be stored at " + this.bulkloadDir);
HFileOutputFormat2.setOutputPath(job, bulkloadDir);
try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) {
@@ -165,9 +182,7 @@ public class CopyTable extends Configured implements Tool {
admin.getDescriptor((TableName.valueOf(dstTableName))));
}
} else {
- TableMapReduceUtil.initTableMapperJob(tableName, scan,
- Import.Importer.class, null, null, job);
-
+ initCopyTableMapperReducerJob(job, scan);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
null);
}
@@ -183,7 +198,7 @@ public class CopyTable extends Configured implements Tool {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
- "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
+ "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
System.err.println();
System.err.println("Options:");
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
@@ -205,6 +220,7 @@ public class CopyTable extends Configured implements Tool {
System.err.println(" all.cells also copy delete markers and deleted cells");
System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
+ "table");
+ System.err.println(" snapshot Copy the data from snapshot to destination table.");
System.err.println();
System.err.println("Args:");
System.err.println(" tablename Name of the table to copy");
@@ -214,6 +230,12 @@ public class CopyTable extends Configured implements Tool {
System.err.println(" $ hbase " +
"org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
"--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
+ System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
+ System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+ + "--snapshot --new.name=destTable sourceTableSnapshot");
+ System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
+ System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+ + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
System.err.println("For performance consider the following general option:\n"
+ " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
+ " decreases the round trip time to the server and may increase performance.\n"
@@ -224,8 +246,6 @@ public class CopyTable extends Configured implements Tool {
}
private boolean doCommandLine(final String[] args) {
- // Process command-line args. TODO: Better cmd-line processing
- // (but hopefully something not as painful as cli options).
if (args.length < 1) {
printUsage(null);
return false;
@@ -313,16 +333,24 @@ public class CopyTable extends Configured implements Tool {
continue;
}
- if (i == args.length-1) {
- tableName = cmd;
+ if(cmd.startsWith("--snapshot")){
+ readingSnapshot = true;
+ continue;
+ }
+
+ if (i == args.length - 1) {
+ if (readingSnapshot) {
+ snapshot = cmd;
+ } else {
+ tableName = cmd;
+ }
} else {
printUsage("Invalid argument '" + cmd + "'");
return false;
}
}
if (dstTableName == null && peerAddress == null) {
- printUsage("At least a new table name or a " +
- "peer address must be specified");
+ printUsage("At least a new table name or a peer address must be specified");
return false;
}
if ((endTime != 0) && (startTime > endTime)) {
@@ -335,6 +363,22 @@ public class CopyTable extends Configured implements Tool {
return false;
}
+ if (readingSnapshot && peerAddress != null) {
+ printUsage("Loading data from snapshot to remote peer cluster is not supported.");
+ return false;
+ }
+
+ if (readingSnapshot && dstTableName == null) {
+ printUsage("The --new.name=<table> for destination table should be "
+ + "provided when copying data from snapshot .");
+ return false;
+ }
+
+ if (readingSnapshot && snapshot == null) {
+ printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
+ return false;
+ }
+
// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
@@ -371,6 +415,9 @@ public class CopyTable extends Configured implements Tool {
}
int code = 0;
if (bulkload) {
+ LOG.info("Trying to bulk load data to destination table: " + dstTableName);
+ LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
+ this.bulkloadDir.toString(), this.dstTableName);
code = new LoadIncrementalHFiles(this.getConf())
.run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
if (code == 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
index ed6857d..5591e5f 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
@@ -24,12 +24,15 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -93,14 +97,9 @@ public class TestCopyTable {
try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY)) {
// put rows into the first table
- for (int i = 0; i < 10; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(FAMILY, COLUMN1, COLUMN1);
- t1.put(p);
- }
+ loadData(t1, FAMILY, COLUMN1);
CopyTable copy = new CopyTable();
-
int code;
if (bulkload) {
code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
@@ -114,12 +113,7 @@ public class TestCopyTable {
assertEquals("copy job failed", 0, code);
// verify the data was copied into table 2
- for (int i = 0; i < 10; i++) {
- Get g = new Get(Bytes.toBytes("row" + i));
- Result r = t2.get(g);
- assertEquals(1, r.size());
- assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
- }
+ verifyRows(t2, FAMILY, COLUMN1);
} finally {
TEST_UTIL.deleteTable(tableName1);
TEST_UTIL.deleteTable(tableName2);
@@ -185,7 +179,6 @@ public class TestCopyTable {
t2.getDescriptor().getValues().size());
assertTrue("The mob row count is 0 but should be > 0",
MobTestUtil.countMobRows(t2) > 0);
-
} finally {
TEST_UTIL.deleteTable(tableName1);
TEST_UTIL.deleteTable(tableName2);
@@ -349,4 +342,95 @@ public class TestCopyTable {
args);
return status == 0;
}
+
+ private void loadData(Table t, byte[] family, byte[] column) throws IOException {
+ for (int i = 0; i < 10; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Put p = new Put(row);
+ p.addColumn(family, column, row);
+ t.put(p);
+ }
+ }
+
+ private void verifyRows(Table t, byte[] family, byte[] column) throws IOException {
+ for (int i = 0; i < 10; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Get g = new Get(row).addFamily(family);
+ Result r = t.get(g);
+ Assert.assertNotNull(r);
+ Assert.assertEquals(1, r.size());
+ Cell cell = r.rawCells()[0];
+ Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
+ Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength(), row, 0, row.length), 0);
+ }
+ }
+
+ private Table createTable(TableName tableName, byte[] family, boolean isMob) throws IOException {
+ if (isMob) {
+ ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family)
+ .setMobEnabled(true).setMobThreshold(1).build();
+ TableDescriptor desc =
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfd).build();
+ return TEST_UTIL.createTable(desc, null);
+ } else {
+ return TEST_UTIL.createTable(tableName, family);
+ }
+ }
+
+ private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad, boolean isMob)
+ throws Exception {
+ TableName table1 = TableName.valueOf(tablePrefix + 1);
+ TableName table2 = TableName.valueOf(tablePrefix + 2);
+ Table t1 = createTable(table1, FAMILY_A, isMob);
+ Table t2 = createTable(table2, FAMILY_A, isMob);
+ loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
+ String snapshot = tablePrefix + "_snapshot";
+ TEST_UTIL.getAdmin().snapshot(snapshot, table1);
+ boolean success;
+ if (bulkLoad) {
+ success =
+ runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot });
+ } else {
+ success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot });
+ }
+ Assert.assertTrue(success);
+ verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
+ }
+
+ @Test
+ public void testLoadingSnapshotToTable() throws Exception {
+ testCopyTableBySnapshot("testLoadingSnapshotToTable", false, false);
+ }
+
+ @Test
+ public void tsetLoadingSnapshotToMobTable() throws Exception {
+ testCopyTableBySnapshot("testLoadingSnapshotToMobTable", false, true);
+ }
+
+ @Test
+ public void testLoadingSnapshotAndBulkLoadToTable() throws Exception {
+ testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true, false);
+ }
+
+ @Test
+ public void testLoadingSnapshotAndBulkLoadToMobTable() throws Exception {
+ testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, true);
+ }
+
+ @Test
+ public void testLoadingSnapshotToRemoteCluster() throws Exception {
+ Assert.assertFalse(runCopy(
+ new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" }));
+ }
+
+ @Test
+ public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
+ Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" }));
+ }
+
+ @Test
+ public void testLoadingSnapshotWithoutDestTable() throws Exception {
+ Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" }));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 7a1a578..23a2399 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,14 +51,21 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
- throws IOException {
+ throws IOException {
// region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
// open region from the snapshot directory
- this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
+ region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
+ hri, htd, null);
+ // we won't initialize the MobFileCache when not running in RS process. so provided an
+ // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
+ // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
+ // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
+ region.setMobFileCache(new MobFileCache(conf));
+ region.initialize();
// create an internal region scanner
this.scanner = region.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dc0fa22..9bf9309 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7055,7 +7055,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param htd the table descriptor
* @return the new instance
*/
- static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
+ public static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, RegionInfo regionInfo, final TableDescriptor htd,
RegionServerServices rsServices) {
try {