You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/10/07 21:34:43 UTC

git commit: HBASE-11997 CopyTable with bulkload (Yi Deng)

Repository: hbase
Updated Branches:
  refs/heads/master 408de0fbb -> e1b69bd54


HBASE-11997 CopyTable with bulkload (Yi Deng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1b69bd5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1b69bd5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1b69bd5

Branch: refs/heads/master
Commit: e1b69bd548bf67913ba58ef45f8bd85e743e12ce
Parents: 408de0f
Author: Ted Yu <te...@apache.org>
Authored: Tue Oct 7 19:34:31 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Tue Oct 7 19:34:31 2014 +0000

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/CopyTable.java       | 129 +++++++++++++++----
 .../hbase/mapreduce/HFileOutputFormat2.java     |  18 +++
 .../hbase/mapreduce/TableInputFormat.java       |  35 ++++-
 .../hbase/mapreduce/TableInputFormatBase.java   |  11 +-
 .../hadoop/hbase/mapreduce/TestCopyTable.java   |  64 +++++----
 5 files changed, 192 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1b69bd5/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 6f985d8..f7ff0e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -21,14 +21,23 @@ package org.apache.hadoop.hbase.mapreduce;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -43,18 +52,22 @@ import org.apache.hadoop.util.ToolRunner;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class CopyTable extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(CopyTable.class);
 
   final static String NAME = "copytable";
-  static long startTime = 0;
-  static long endTime = 0;
-  static int versions = -1;
-  static String tableName = null;
-  static String startRow = null;
-  static String stopRow = null;
-  static String newTableName = null;
-  static String peerAddress = null;
-  static String families = null;
-  static boolean allCells = false;
+  long startTime = 0;
+  long endTime = 0;
+  int versions = -1;
+  String tableName = null;
+  String startRow = null;
+  String stopRow = null;
+  String dstTableName = null;
+  String peerAddress = null;
+  String families = null;
+  boolean allCells = false;
+  
+  boolean bulkload = false;
+  Path bulkloadDir = null;
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
@@ -64,17 +77,17 @@ public class CopyTable extends Configured implements Tool {
   /**
    * Sets up the actual job.
    *
-   * @param conf  The current configuration.
    * @param args  The command line parameters.
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
+  public Job createSubmittableJob(String[] args)
   throws IOException {
     if (!doCommandLine(args)) {
       return null;
     }
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    
+    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
     job.setJarByClass(CopyTable.class);
     Scan scan = new Scan();
     scan.setCacheBlocks(false);
@@ -116,12 +129,40 @@ public class CopyTable extends Configured implements Tool {
       }
       Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
     }
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        Import.Importer.class, null, null, job);
-    TableMapReduceUtil.initTableReducerJob(
-        newTableName == null ? tableName : newTableName, null, job,
-        null, peerAddress, null, null);
     job.setNumReduceTasks(0);
+    
+    if (bulkload) {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
+        null, job);
+      
+      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
+      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
+      
+      FileSystem fs = FileSystem.get(getConf());
+      Random rand = new Random();
+      Path root = new Path(fs.getWorkingDirectory(), "copytable");
+      fs.mkdirs(root);
+      while (true) {
+        bulkloadDir = new Path(root, "" + rand.nextLong());
+        if (!fs.exists(bulkloadDir)) {
+          break;
+        }
+      }
+      
+      System.out.println("HFiles will be stored at " + this.bulkloadDir);
+      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
+      try (Connection conn = ConnectionFactory.createConnection(getConf());
+          Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
+        HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
+      }
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan,
+        Import.Importer.class, null, null, job);
+      
+      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
+        null);
+    }
+    
     return job;
   }
 
@@ -152,6 +193,8 @@ public class CopyTable extends Configured implements Tool {
     System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
     System.err.println("              To keep the same name, just give \"cfName\"");
     System.err.println(" all.cells    also copy delete markers and deleted cells");
+    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
+        + "table");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" tablename    Name of the table to copy");
@@ -170,7 +213,7 @@ public class CopyTable extends Configured implements Tool {
         + "    -Dmapreduce.map.speculative=false");
   }
 
-  private static boolean doCommandLine(final String[] args) {
+  private boolean doCommandLine(final String[] args) {
     // Process command-line args. TODO: Better cmd-line processing
     // (but hopefully something not as painful as cli options).
     if (args.length < 1) {
@@ -217,7 +260,7 @@ public class CopyTable extends Configured implements Tool {
 
         final String newNameArgKey = "--new.name=";
         if (cmd.startsWith(newNameArgKey)) {
-          newTableName = cmd.substring(newNameArgKey.length());
+          dstTableName = cmd.substring(newNameArgKey.length());
           continue;
         }
 
@@ -237,6 +280,11 @@ public class CopyTable extends Configured implements Tool {
           allCells = true;
           continue;
         }
+        
+        if (cmd.startsWith("--bulkload")) {
+          bulkload = true;
+          continue;
+        }
 
         if (i == args.length-1) {
           tableName = cmd;
@@ -245,7 +293,7 @@ public class CopyTable extends Configured implements Tool {
           return false;
         }
       }
-      if (newTableName == null && peerAddress == null) {
+      if (dstTableName == null && peerAddress == null) {
         printUsage("At least a new table name or a " +
             "peer address must be specified");
         return false;
@@ -254,6 +302,16 @@ public class CopyTable extends Configured implements Tool {
         printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
         return false;
       }
+      
+      if (bulkload && peerAddress != null) {
+        printUsage("Remote bulkload is not supported!");
+        return false;
+      }
+      
+      // set dstTableName if necessary
+      if (dstTableName == null) {
+        dstTableName = tableName;
+      }
     } catch (Exception e) {
       e.printStackTrace();
       printUsage("Can't start because " + e.getMessage());
@@ -276,8 +334,29 @@ public class CopyTable extends Configured implements Tool {
   @Override
   public int run(String[] args) throws Exception {
     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
-    Job job = createSubmittableJob(getConf(), otherArgs);
+    Job job = createSubmittableJob(otherArgs);
     if (job == null) return 1;
-    return job.waitForCompletion(true) ? 0 : 1;
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      if (bulkload) {
+        LOG.info("Files are not bulkloaded!");
+      }
+      return 1;
+    }
+    int code = 0;
+    if (bulkload) {
+      code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
+          this.dstTableName});
+      if (code == 0) {
+        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
+        // LoadIncrementalHFiles.
+        FileSystem fs = FileSystem.get(this.getConf());
+        if (!fs.delete(this.bulkloadDir, true)) {
+          LOG.error("Deleting folder " + bulkloadDir + " failed!");
+          code = 1;
+        }
+      }
+    }
+    return code;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1b69bd5/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index f8f9b4d..2c0efc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -404,6 +404,24 @@ public class HFileOutputFormat2
     LOG.info("Incremental table " + Bytes.toString(table.getTableName())
       + " output configured.");
   }
+  
+  public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
+    Configuration conf = job.getConfiguration();
+
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat2.class);
+
+    // Set compression algorithms based on column families
+    configureCompression(table, conf);
+    configureBloomType(table, conf);
+    configureBlockSize(table, conf);
+    configureDataBlockEncoding(table, conf);
+
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.initCredentials(job);
+    LOG.info("Incremental table " + table.getName() + " output configured.");
+  }
 
   /**
    * Runs inside the task to deserialize column family to compression algorithm

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1b69bd5/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index c7fa29e..42a231b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -22,15 +22,20 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -45,6 +50,11 @@ implements Configurable {
 
   /** Job parameter that specifies the input table. */
   public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
+  /**
+   * If specified, use start keys of this table to split.
+   * This is useful when you are preparing data for bulkload.
+   */
+  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
   /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
    * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
    */
@@ -103,7 +113,7 @@ implements Configurable {
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
     }
-
+    
     Scan scan = null;
 
     if (conf.get(SCAN) != null) {
@@ -214,4 +224,23 @@ implements Configurable {
     }
   }
 
+  @Override
+  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
+    if (conf.get(SPLIT_TABLE) != null) {
+      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
+      try (Connection conn = ConnectionFactory.createConnection(getConf());
+          RegionLocator rl = conn.getRegionLocator(splitTableName)) {
+        return rl.getStartEndKeys();
+      }
+    }
+
+    return super.getStartEndKeys();
+  }
+  
+  /**
+   * Sets split table in map-reduce job.
+   */
+  public static void configureSplitTable(Job job, TableName tableName) {
+    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1b69bd5/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index c196eed..5dcfe00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -101,8 +101,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   private RegionLocator regionLocator;
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
-
-
+  
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
   private HashMap<InetAddress, String> reverseDNSCacheMap =
     new HashMap<InetAddress, String>();
@@ -142,6 +141,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     trr.setTable(table);
     return trr;
   }
+  
+  protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
+    return regionLocator.getStartEndKeys();
+  }
 
   /**
    * Calculates the splits that will serve as input for the map tasks. The
@@ -160,8 +163,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
 
     RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table);
-
-    Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
+    
+    Pair<byte[][], byte[][]> keys = getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
       HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1b69bd5/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
index fac90a8..4b11abb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java
@@ -31,13 +31,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
 import org.apache.hadoop.mapreduce.Job;
@@ -53,7 +52,6 @@ import org.junit.experimental.categories.Category;
 @Category({MapReduceTests.class, LargeTests.class})
 public class TestCopyTable {
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static MiniHBaseCluster cluster;
   private static final byte[] ROW1 = Bytes.toBytes("row1");
   private static final byte[] ROW2 = Bytes.toBytes("row2");
   private static final String FAMILY_A_STRING = "a";
@@ -65,7 +63,7 @@ public class TestCopyTable {
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    cluster = TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.startMiniMapReduceCluster();
   }
 
@@ -75,12 +73,7 @@ public class TestCopyTable {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  /**
-   * Simple end-to-end test
-   * @throws Exception
-   */
-  @Test
-  public void testCopyTable() throws Exception {
+  private void doCopyTableTest(boolean bulkload) throws Exception {
     final TableName TABLENAME1 = TableName.valueOf("testCopyTable1");
     final TableName TABLENAME2 = TableName.valueOf("testCopyTable2");
     final byte[] FAMILY = Bytes.toBytes("family");
@@ -98,10 +91,15 @@ public class TestCopyTable {
 
     CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
 
-    assertEquals(
-      0,
-      copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
-          TABLENAME1.getNameAsString() }));
+    int code;
+    if (bulkload) {
+      code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
+          "--bulkload", TABLENAME1.getNameAsString() });
+    } else {
+      code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
+          TABLENAME1.getNameAsString() });
+    }
+    assertEquals("copy job failed", 0, code);
 
     // verify the data was copied into table 2
     for (int i = 0; i < 10; i++) {
@@ -117,6 +115,23 @@ public class TestCopyTable {
     TEST_UTIL.deleteTable(TABLENAME2);
   }
 
+  /**
+   * Simple end-to-end test
+   * @throws Exception
+   */
+  @Test
+  public void testCopyTable() throws Exception {
+    doCopyTableTest(false);
+  }
+  
+  /**
+   * Simple end-to-end test with bulkload.
+   */
+  @Test
+  public void testCopyTableWithBulkload() throws Exception {
+    doCopyTableTest(true);
+  }
+  
   @Test
   public void testStartStopRow() throws Exception {
     final TableName TABLENAME1 = TableName.valueOf("testStartStopRow1");
@@ -196,7 +211,6 @@ public class TestCopyTable {
         "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000),
         "--versions=1", sourceTable };
     assertNull(t2.get(new Get(ROW1)).getRow());
-    clean();
 
     assertTrue(runCopy(args));
 
@@ -245,24 +259,8 @@ public class TestCopyTable {
         new Configuration(TEST_UTIL.getConfiguration()), args);
     Configuration configuration = opts.getConfiguration();
     args = opts.getRemainingArgs();
-    clean();
-    Job job = CopyTable.createSubmittableJob(configuration, args);
+    Job job = new CopyTable(configuration).createSubmittableJob(args);
     job.waitForCompletion(false);
     return job.isSuccessful();
   }
-
-
-  private void clean() {
-
-      CopyTable.startTime = 0;
-      CopyTable.endTime = 0;
-      CopyTable.versions = -1;
-      CopyTable.tableName = null;
-      CopyTable.startRow = null;
-      CopyTable.stopRow = null;
-      CopyTable.newTableName = null;
-      CopyTable.peerAddress = null;
-      CopyTable.families = null;
-      CopyTable.allCells = false;
-  }
 }