You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:46:09 UTC

[33/47] hbase git commit: HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time

HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c2d5991b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c2d5991b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c2d5991b

Branch: refs/heads/HBASE-21512
Commit: c2d5991b82e3b807cb11f5735ef5068b73720725
Parents: c552088
Author: huzheng <op...@gmail.com>
Authored: Wed Dec 26 16:17:55 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Thu Dec 27 18:22:54 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/CopyTable.java       | 109 ++++++++++++------
 .../hadoop/hbase/mapreduce/TestCopyTable.java   | 110 ++++++++++++++++---
 .../hbase/client/ClientSideRegionScanner.java   |  14 ++-
 .../hadoop/hbase/regionserver/HRegion.java      |   2 +-
 4 files changed, 187 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 4e57f54..b59c9e6 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
+import java.util.UUID;
 
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
+import org.apache.hadoop.hbase.mapreduce.Import.Importer;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -70,8 +72,34 @@ public class CopyTable extends Configured implements Tool {
   boolean bulkload = false;
   Path bulkloadDir = null;
 
+  boolean readingSnapshot = false;
+  String snapshot = null;
+
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
+  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
+    FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
+    Path dir = new Path(fs.getWorkingDirectory(), NAME);
+    if (!fs.exists(dir)) {
+      fs.mkdirs(dir);
+    }
+    Path newDir = new Path(dir, UUID.randomUUID().toString());
+    if (withDirCreated) {
+      fs.mkdirs(newDir);
+    }
+    return newDir;
+  }
+
+  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
+    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
+    if (readingSnapshot) {
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
+        generateUniqTempDir(true));
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
+    }
+  }
+
   /**
    * Sets up the actual job.
    *
@@ -79,13 +107,13 @@ public class CopyTable extends Configured implements Tool {
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
-  public Job createSubmittableJob(String[] args)
-  throws IOException {
+  public Job createSubmittableJob(String[] args) throws IOException {
     if (!doCommandLine(args)) {
       return null;
     }
 
-    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
+    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
     job.setJarByClass(CopyTable.class);
     Scan scan = new Scan();
 
@@ -107,15 +135,15 @@ public class CopyTable extends Configured implements Tool {
       job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
     }
     if (versions >= 0) {
-      scan.setMaxVersions(versions);
+      scan.readVersions(versions);
     }
 
     if (startRow != null) {
-      scan.setStartRow(Bytes.toBytesBinary(startRow));
+      scan.withStartRow(Bytes.toBytesBinary(startRow));
     }
 
     if (stopRow != null) {
-      scan.setStopRow(Bytes.toBytesBinary(stopRow));
+      scan.withStopRow(Bytes.toBytesBinary(stopRow));
     }
 
     if(families != null) {
@@ -140,24 +168,13 @@ public class CopyTable extends Configured implements Tool {
     job.setNumReduceTasks(0);
 
     if (bulkload) {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null,
-        job);
+      initCopyTableMapperReducerJob(job, scan);
 
       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
       TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
 
-      FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
-      Random rand = new Random();
-      Path root = new Path(fs.getWorkingDirectory(), "copytable");
-      fs.mkdirs(root);
-      while (true) {
-        bulkloadDir = new Path(root, "" + rand.nextLong());
-        if (!fs.exists(bulkloadDir)) {
-          break;
-        }
-      }
-
-      System.out.println("HFiles will be stored at " + this.bulkloadDir);
+      bulkloadDir = generateUniqTempDir(false);
+      LOG.info("HFiles will be stored at " + this.bulkloadDir);
       HFileOutputFormat2.setOutputPath(job, bulkloadDir);
       try (Connection conn = ConnectionFactory.createConnection(getConf());
           Admin admin = conn.getAdmin()) {
@@ -165,9 +182,7 @@ public class CopyTable extends Configured implements Tool {
           admin.getDescriptor((TableName.valueOf(dstTableName))));
       }
     } else {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        Import.Importer.class, null, null, job);
-
+      initCopyTableMapperReducerJob(job, scan);
       TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
         null);
     }
@@ -183,7 +198,7 @@ public class CopyTable extends Configured implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
-        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
+        "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
@@ -205,6 +220,7 @@ public class CopyTable extends Configured implements Tool {
     System.err.println(" all.cells    also copy delete markers and deleted cells");
     System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
         + "table");
+    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" tablename    Name of the table to copy");
@@ -214,6 +230,12 @@ public class CopyTable extends Configured implements Tool {
     System.err.println(" $ hbase " +
         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
+    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
+    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+        + "--snapshot --new.name=destTable sourceTableSnapshot");
+    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
+    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+        + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
     System.err.println("For performance consider the following general option:\n"
         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
         + "  decreases the round trip time to the server and may increase performance.\n"
@@ -224,8 +246,6 @@ public class CopyTable extends Configured implements Tool {
   }
 
   private boolean doCommandLine(final String[] args) {
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
     if (args.length < 1) {
       printUsage(null);
       return false;
@@ -313,16 +333,24 @@ public class CopyTable extends Configured implements Tool {
           continue;
         }
 
-        if (i == args.length-1) {
-          tableName = cmd;
+        if(cmd.startsWith("--snapshot")){
+          readingSnapshot = true;
+          continue;
+        }
+
+        if (i == args.length - 1) {
+          if (readingSnapshot) {
+            snapshot = cmd;
+          } else {
+            tableName = cmd;
+          }
         } else {
           printUsage("Invalid argument '" + cmd + "'");
           return false;
         }
       }
       if (dstTableName == null && peerAddress == null) {
-        printUsage("At least a new table name or a " +
-            "peer address must be specified");
+        printUsage("At least a new table name or a peer address must be specified");
         return false;
       }
       if ((endTime != 0) && (startTime > endTime)) {
@@ -335,6 +363,22 @@ public class CopyTable extends Configured implements Tool {
         return false;
       }
 
+      if (readingSnapshot && peerAddress != null) {
+        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
+        return false;
+      }
+
+      if (readingSnapshot && dstTableName == null) {
+        printUsage("The --new.name=<table> for destination table should be "
+            + "provided when copying data from snapshot .");
+        return false;
+      }
+
+      if (readingSnapshot && snapshot == null) {
+        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
+        return false;
+      }
+
       // set dstTableName if necessary
       if (dstTableName == null) {
         dstTableName = tableName;
@@ -371,6 +415,9 @@ public class CopyTable extends Configured implements Tool {
     }
     int code = 0;
     if (bulkload) {
+      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
+      LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
+        this.bulkloadDir.toString(), this.dstTableName);
       code = new LoadIncrementalHFiles(this.getConf())
           .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
       if (code == 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
index ed6857d..5591e5f 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
@@ -24,12 +24,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.util.LauncherSecurityManager;
 import org.apache.hadoop.util.ToolRunner;
 
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -93,14 +97,9 @@ public class TestCopyTable {
     try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
          Table t2 = TEST_UTIL.createTable(tableName2, FAMILY)) {
       // put rows into the first table
-      for (int i = 0; i < 10; i++) {
-        Put p = new Put(Bytes.toBytes("row" + i));
-        p.addColumn(FAMILY, COLUMN1, COLUMN1);
-        t1.put(p);
-      }
+      loadData(t1, FAMILY, COLUMN1);
 
       CopyTable copy = new CopyTable();
-
       int code;
       if (bulkload) {
         code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
@@ -114,12 +113,7 @@ public class TestCopyTable {
       assertEquals("copy job failed", 0, code);
 
       // verify the data was copied into table 2
-      for (int i = 0; i < 10; i++) {
-        Get g = new Get(Bytes.toBytes("row" + i));
-        Result r = t2.get(g);
-        assertEquals(1, r.size());
-        assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
-      }
+      verifyRows(t2, FAMILY, COLUMN1);
     } finally {
       TEST_UTIL.deleteTable(tableName1);
       TEST_UTIL.deleteTable(tableName2);
@@ -185,7 +179,6 @@ public class TestCopyTable {
               t2.getDescriptor().getValues().size());
       assertTrue("The mob row count is 0 but should be > 0",
               MobTestUtil.countMobRows(t2) > 0);
-
     } finally {
       TEST_UTIL.deleteTable(tableName1);
       TEST_UTIL.deleteTable(tableName2);
@@ -349,4 +342,95 @@ public class TestCopyTable {
         args);
     return status == 0;
   }
+
+  private void loadData(Table t, byte[] family, byte[] column) throws IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put p = new Put(row);
+      p.addColumn(family, column, row);
+      t.put(p);
+    }
+  }
+
+  private void verifyRows(Table t, byte[] family, byte[] column) throws IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Get g = new Get(row).addFamily(family);
+      Result r = t.get(g);
+      Assert.assertNotNull(r);
+      Assert.assertEquals(1, r.size());
+      Cell cell = r.rawCells()[0];
+      Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
+      Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+        cell.getValueLength(), row, 0, row.length), 0);
+    }
+  }
+
+  private Table createTable(TableName tableName, byte[] family, boolean isMob) throws IOException {
+    if (isMob) {
+      ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family)
+          .setMobEnabled(true).setMobThreshold(1).build();
+      TableDescriptor desc =
+          TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfd).build();
+      return TEST_UTIL.createTable(desc, null);
+    } else {
+      return TEST_UTIL.createTable(tableName, family);
+    }
+  }
+
+  private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad, boolean isMob)
+      throws Exception {
+    TableName table1 = TableName.valueOf(tablePrefix + 1);
+    TableName table2 = TableName.valueOf(tablePrefix + 2);
+    Table t1 = createTable(table1, FAMILY_A, isMob);
+    Table t2 = createTable(table2, FAMILY_A, isMob);
+    loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
+    String snapshot = tablePrefix + "_snapshot";
+    TEST_UTIL.getAdmin().snapshot(snapshot, table1);
+    boolean success;
+    if (bulkLoad) {
+      success =
+          runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot });
+    } else {
+      success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot });
+    }
+    Assert.assertTrue(success);
+    verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
+  }
+
+  @Test
+  public void testLoadingSnapshotToTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotToTable", false, false);
+  }
+
+  @Test
+  public void tsetLoadingSnapshotToMobTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotToMobTable", false, true);
+  }
+
+  @Test
+  public void testLoadingSnapshotAndBulkLoadToTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true, false);
+  }
+
+  @Test
+  public void testLoadingSnapshotAndBulkLoadToMobTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, true);
+  }
+
+  @Test
+  public void testLoadingSnapshotToRemoteCluster() throws Exception {
+    Assert.assertFalse(runCopy(
+      new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" }));
+  }
+
+  @Test
+  public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
+    Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" }));
+  }
+
+  @Test
+  public void testLoadingSnapshotWithoutDestTable() throws Exception {
+    Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" }));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 7a1a578..23a2399 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,14 +51,21 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
 
   public ClientSideRegionScanner(Configuration conf, FileSystem fs,
       Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
-          throws IOException {
+      throws IOException {
     // region is immutable, set isolation level
     scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
 
     htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
 
     // open region from the snapshot directory
-    this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
+    region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
+      hri, htd, null);
+    // we won't initialize the MobFileCache when not running in RS process. so provided an
+    // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
+    // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
+    // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
+    region.setMobFileCache(new MobFileCache(conf));
+    region.initialize();
 
     // create an internal region scanner
     this.scanner = region.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c2d5991b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dc0fa22..9bf9309 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7055,7 +7055,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param htd the table descriptor
    * @return the new instance
    */
-  static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
+  public static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
       Configuration conf, RegionInfo regionInfo, final TableDescriptor htd,
       RegionServerServices rsServices) {
     try {