You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/01/16 18:08:40 UTC

[hbase] branch branch-1 updated: HBASE-21675 Port HBASE-21642 (CopyTable by reading snapshot and bulkloading will save a lot of time) to branch-1

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new cd0645a  HBASE-21675 Port HBASE-21642 (CopyTable by reading snapshot and bulkloading will save a lot of time) to branch-1
cd0645a is described below

commit cd0645a5f6c0e15d5571e30b63dff175e318f4bd
Author: openinx <op...@gmail.com>
AuthorDate: Tue Jan 15 15:34:39 2019 -0800

    HBASE-21675 Port HBASE-21642 (CopyTable by reading snapshot and bulkloading will save a lot of time) to branch-1
    
    HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../apache/hadoop/hbase/mapreduce/CopyTable.java   | 110 +++++++++++-----
 .../hadoop/hbase/mapreduce/TestCopyTable.java      | 143 ++++++++++++++-------
 2 files changed, 178 insertions(+), 75 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index f63ed3e..3f042a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
+import org.apache.hadoop.hbase.mapreduce.Import.Importer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.mapreduce.Job;
@@ -74,11 +76,39 @@ public class CopyTable extends Configured implements Tool {
   boolean bulkload = false;
   Path bulkloadDir = null;
 
+  boolean readingSnapshot = false;
+  String snapshot = null;
+
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
   public CopyTable(Configuration conf) {
     super(conf);
   }
+
+  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
+    FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
+    Path dir = new Path(getConf().get(HConstants.TEMPORARY_FS_DIRECTORY_KEY), NAME);
+    if (!fs.exists(dir)) {
+      fs.mkdirs(dir);
+    }
+    Path newDir = new Path(dir, UUID.randomUUID().toString());
+    if (withDirCreated) {
+      fs.mkdirs(newDir);
+    }
+    return newDir;
+  }
+
+  @SuppressWarnings({"rawtypes","unchecked"})
+  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
+    Class mapper = bulkload ? KeyValueImporter.class : Importer.class;
+    if (readingSnapshot) {
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
+        generateUniqTempDir(true));
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
+    }
+  }
+
   /**
    * Sets up the actual job.
    *
@@ -86,14 +116,13 @@ public class CopyTable extends Configured implements Tool {
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
-  public Job createSubmittableJob(String[] args)
-  throws IOException {
+  public Job createSubmittableJob(String[] args) throws IOException {
     if (!doCommandLine(args)) {
       return null;
     }
 
-    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
-
+    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
+    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
     job.setJarByClass(CopyTable.class);
     Scan scan = new Scan();
          scan.setBatch(batch);
@@ -146,33 +175,20 @@ public class CopyTable extends Configured implements Tool {
     job.setNumReduceTasks(0);
 
     if (bulkload) {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
-        null, job);
+      initCopyTableMapperReducerJob(job, scan);
 
       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
       TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
 
-      FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
-      Random rand = new Random();
-      Path root = new Path(fs.getWorkingDirectory(), "copytable");
-      fs.mkdirs(root);
-      while (true) {
-        bulkloadDir = new Path(root, "" + rand.nextLong());
-        if (!fs.exists(bulkloadDir)) {
-          break;
-        }
-      }
-
-      System.out.println("HFiles will be stored at " + this.bulkloadDir);
+      bulkloadDir = generateUniqTempDir(false);
+      LOG.info("HFiles will be stored at " + this.bulkloadDir);
       HFileOutputFormat2.setOutputPath(job, bulkloadDir);
       try (Connection conn = ConnectionFactory.createConnection(getConf());
           Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
         HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
       }
     } else {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        Import.Importer.class, null, null, job);
-
+      initCopyTableMapperReducerJob(job, scan);
       TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
         null);
     }
@@ -188,7 +204,7 @@ public class CopyTable extends Configured implements Tool {
       System.err.println("ERROR: " + errorMsg);
     }
     System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
-        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
+        "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
@@ -209,6 +225,7 @@ public class CopyTable extends Configured implements Tool {
     System.err.println(" all.cells    also copy delete markers and deleted cells");
     System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
         + "table");
+    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" tablename    Name of the table to copy");
@@ -218,6 +235,12 @@ public class CopyTable extends Configured implements Tool {
     System.err.println(" $ bin/hbase " +
         "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
         "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
+    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
+    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+        + "--snapshot --new.name=destTable sourceTableSnapshot");
+    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
+    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+        + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
     System.err.println("For performance consider the following general option:\n"
         + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
         + "  decreases the round trip time to the server and may increase performance.\n"
@@ -228,8 +251,6 @@ public class CopyTable extends Configured implements Tool {
   }
 
   private boolean doCommandLine(final String[] args) {
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
     if (args.length < 1) {
       printUsage(null);
       return false;
@@ -317,16 +338,24 @@ public class CopyTable extends Configured implements Tool {
           continue;
         }
 
-        if (i == args.length-1) {
-          tableName = cmd;
+        if(cmd.startsWith("--snapshot")){
+          readingSnapshot = true;
+          continue;
+        }
+
+        if (i == args.length - 1) {
+          if (readingSnapshot) {
+            snapshot = cmd;
+          } else {
+            tableName = cmd;
+          }
         } else {
           printUsage("Invalid argument '" + cmd + "'" );
           return false;
         }
       }
       if (dstTableName == null && peerAddress == null) {
-        printUsage("At least a new table name or a " +
-            "peer address must be specified");
+        printUsage("At least a new table name or a peer address must be specified");
         return false;
       }
       if ((endTime != 0) && (startTime > endTime)) {
@@ -339,6 +368,22 @@ public class CopyTable extends Configured implements Tool {
         return false;
       }
 
+      if (readingSnapshot && peerAddress != null) {
+        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
+        return false;
+      }
+
+      if (readingSnapshot && dstTableName == null) {
+        printUsage("The --new.name=<table> for destination table should be "
+            + "provided when copying data from snapshot .");
+        return false;
+      }
+
+      if (readingSnapshot && snapshot == null) {
+        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
+        return false;
+      }
+
       // set dstTableName if necessary
       if (dstTableName == null) {
         dstTableName = tableName;
@@ -376,8 +421,11 @@ public class CopyTable extends Configured implements Tool {
     }
     int code = 0;
     if (bulkload) {
-      code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
-          this.dstTableName});
+      LOG.info("Trying to bulk load data to destination table: " + dstTableName
+          + ", command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
+          + bulkloadDir.toString() + " " + dstTableName);
+      code = new LoadIncrementalHFiles(this.getConf())
+          .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
       if (code == 0) {
         // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
         // LoadIncrementalHFiles.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
index ea9de87..bba7401 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -38,9 +39,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -77,40 +78,30 @@ public class TestCopyTable {
     final byte[] FAMILY = Bytes.toBytes("family");
     final byte[] COLUMN1 = Bytes.toBytes("c1");
 
-    Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
-    Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
-
-    // put rows into the first table
-    for (int i = 0; i < 10; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.add(FAMILY, COLUMN1, COLUMN1);
-      t1.put(p);
-    }
-
-    CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
-
-    int code;
-    if (bulkload) {
-      code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
-          "--bulkload", TABLENAME1.getNameAsString() });
-    } else {
-      code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
-          TABLENAME1.getNameAsString() });
-    }
-    assertEquals("copy job failed", 0, code);
-
-    // verify the data was copied into table 2
-    for (int i = 0; i < 10; i++) {
-      Get g = new Get(Bytes.toBytes("row" + i));
-      Result r = t2.get(g);
-      assertEquals(1, r.size());
-      assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
+    try (Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
+         Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);) {
+      // put rows into the first table
+      loadData(t1, FAMILY, COLUMN1);
+
+      CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
+      int code;
+      if (bulkload) {
+        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
+            copy, new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
+            "--bulkload", TABLENAME1.getNameAsString() });
+      } else {
+        code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
+            copy, new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
+            TABLENAME1.getNameAsString() });
+      }
+      assertEquals("copy job failed", 0, code);
+
+      // verify the data was copied into table 2
+      verifyRows(t2, FAMILY, COLUMN1);
+    } finally {
+      TEST_UTIL.deleteTable(TABLENAME1);
+      TEST_UTIL.deleteTable(TABLENAME2);
     }
-
-    t1.close();
-    t2.close();
-    TEST_UTIL.deleteTable(TABLENAME1);
-    TEST_UTIL.deleteTable(TABLENAME2);
   }
 
   /**
@@ -251,14 +242,78 @@ public class TestCopyTable {
     assertTrue(data.toString().contains("Usage:"));
   }
 
-  private boolean runCopy(String[] args) throws IOException, InterruptedException,
-      ClassNotFoundException {
-    GenericOptionsParser opts = new GenericOptionsParser(
-        new Configuration(TEST_UTIL.getConfiguration()), args);
-    Configuration configuration = opts.getConfiguration();
-    args = opts.getRemainingArgs();
-    Job job = new CopyTable(configuration).createSubmittableJob(args);
-    job.waitForCompletion(false);
-    return job.isSuccessful();
+  private boolean runCopy(String[] args) throws Exception {
+    CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
+    int code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, args);
+    return code == 0;
+  }
+
+  private void loadData(Table t, byte[] family, byte[] column) throws IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put p = new Put(row);
+      p.addColumn(family, column, row);
+      t.put(p);
+    }
+  }
+
+  private void verifyRows(Table t, byte[] family, byte[] column) throws IOException {
+    for (int i = 0; i < 10; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Get g = new Get(row).addFamily(family);
+      Result r = t.get(g);
+      Assert.assertNotNull(r);
+      Assert.assertEquals(1, r.size());
+      Cell cell = r.rawCells()[0];
+      Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
+      Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+        cell.getValueLength(), row, 0, row.length), 0);
+    }
+  }
+
+  private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad)
+      throws Exception {
+    TableName table1 = TableName.valueOf(tablePrefix + 1);
+    TableName table2 = TableName.valueOf(tablePrefix + 2);
+    Table t1 = TEST_UTIL.createTable(table1, FAMILY_A);
+    Table t2 = TEST_UTIL.createTable(table2, FAMILY_A);
+    loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
+    String snapshot = tablePrefix + "_snapshot";
+    TEST_UTIL.getHBaseAdmin().snapshot(snapshot, table1);
+    boolean success;
+    if (bulkLoad) {
+      success =
+          runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot });
+    } else {
+      success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot });
+    }
+    Assert.assertTrue(success);
+    verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
+  }
+
+  @Test
+  public void testLoadingSnapshotToTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotToTable", false);
+  }
+
+  @Test
+  public void testLoadingSnapshotAndBulkLoadToTable() throws Exception {
+    testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true);
+  }
+
+  @Test
+  public void testLoadingSnapshotToRemoteCluster() throws Exception {
+    Assert.assertFalse(runCopy(
+      new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" }));
+  }
+
+  @Test
+  public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
+    Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" }));
+  }
+
+  @Test
+  public void testLoadingSnapshotWithoutDestTable() throws Exception {
+    Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" }));
   }
 }