You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/23 23:30:57 UTC

svn commit: r797230 - in /hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase: migration/nineteen/HStoreFileToStoreFile.java util/Migrate.java

Author: stack
Date: Thu Jul 23 21:30:56 2009
New Revision: 797230

URL: http://svn.apache.org/viewvc?rev=797230&view=rev
Log:
HBASE-1215 migration; fixes for jgray -- make HStoreFileToStoreFile implement Tool so can pass params

Modified:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/migration/nineteen/HStoreFileToStoreFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Migrate.java

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/migration/nineteen/HStoreFileToStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/migration/nineteen/HStoreFileToStoreFile.java?rev=797230&r1=797229&r2=797230&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/migration/nineteen/HStoreFileToStoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/migration/nineteen/HStoreFileToStoreFile.java Thu Jul 23 21:30:56 2009
@@ -23,6 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,29 +38,42 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Mapper that rewrites hbase 0.19 HStoreFiles as 0.20 StoreFiles.
- * Creates passed directories as input and output.  On startup, it checks
- * filesystem is 0.19 generation.  It then crawls the filesystem to find the
+ * Creates passed directories as input and output.  On startup, it does not
+ * check filesystem is 0.19 generation just in case it fails part way so it
+ * should be possible to rerun the MR job.  It'll just fix the 0.19 regions
+ * found.
+ * If the input dir does not exist, it first crawls the filesystem to find the
  * files to migrate writing a file into the input directory.  Next it starts up
  * the MR job to rewrite the 0.19 HStoreFiles as 0.20 StoreFiles deleting the
  * old as it goes.  Presumption is that only
  * one file per in the family Store else stuff breaks; i.e. the 0.19 install
- * was major compacted before migration began.
+ * was major compacted before migration began.  If this job fails, fix why then
+ * it should be possible to rerun the job.  You may want to edit the
+ * generated file in the input dir first.
  */
-public class HStoreFileToStoreFile {
+public class HStoreFileToStoreFile extends Configured implements Tool {
   static final Log LOG = LogFactory.getLog(HStoreFileToStoreFile.class);
   public static final String JOBNAME = "hsf2sf";
 
+  HStoreFileToStoreFile() {
+    super();
+  }
+
   public static class Map extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
-    protected void map(LongWritable key, Text value, Mapper<LongWritable,Text, LongWritable, LongWritable>.Context context)
+    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
     throws java.io.IOException, InterruptedException {
       HBaseConfiguration c = new HBaseConfiguration(context.getConfiguration());
-      Migrate.rewrite(c, FileSystem.get(c), new Path(value.toString()));
+      Path p = new Path(value.toString());
+      context.setStatus(key.toString() + " " + p.toString());
+      Migrate.rewrite(c, FileSystem.get(c), p);
     }
   }
 
@@ -67,108 +81,107 @@
       final FileSystem fs, final Path dir)
   throws IOException {
     if (fs.exists(dir)) {
-      throw new IOException("Input exists -- please specify empty input dir");
+      LOG.warn("Input directory already exits. Using content for this MR job.");
+      return;
+    }
+    FSDataOutputStream out = fs.create(new Path(dir, "mapfiles"));
+    try {
+      gathermapfiles(conf, fs, out);
+    } finally {
+      if (out != null) out.close();
     }
-    gathermapfiles(conf, fs, dir);
   }
   
   private static void gathermapfiles(final HBaseConfiguration conf,
-      final FileSystem fs, final Path dir)
+      final FileSystem fs, final FSDataOutputStream out)
   throws IOException {
-    int index = 0;
-    FSDataOutputStream out = getOut(fs, dir, index, null);
-    try {
-      // Presumes any directory under hbase.rootdir is a table.
-      FileStatus [] tableDirs =
-        fs.listStatus(FSUtils.getRootDir(conf), new DirFilter(fs));
-      for (int i = 0; i < tableDirs.length; i++) {
-        // Inside a table, there are compaction.dir directories to skip.
-        // Otherwise, all else should be regions.  Then in each region, should
-        // only be family directories.  Under each of these, should be a mapfile
-        // and info directory and in these only one file.
-        Path d = tableDirs[i].getPath();
-        if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
-        FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
-        for (int j = 0; j < regionDirs.length; j++) {
-          Path dd = regionDirs[j].getPath();
-          if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
-          // Else its a region name.  Now look in region for families.
-          FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
-          for (int k = 0; k < familyDirs.length; k++) {
-            Path family = familyDirs[k].getPath();
-            FileStatus [] infoAndMapfile = fs.listStatus(family);
-            // Assert that only info and mapfile in family dir.
-            if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
-              LOG.warn(family.toString() +
-                  " has more than just info and mapfile: " + infoAndMapfile.length + ". Continuing...");
-              continue;
-            }
-            // Make sure directory named info or mapfile.
-            for (int ll = 0; ll < 2; ll++) {
-              if (infoAndMapfile[ll].getPath().getName().equals("info") ||
-                  infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
-                continue;
-              LOG.warn("Unexpected directory name: " +
-                  infoAndMapfile[ll].getPath() + ". Continuing...");
-              continue;
-            }
-            // Now in family, there are 'mapfile' and 'info' subdirs.  Just
-            // look in the 'mapfile' subdir.
-            FileStatus [] familyStatus =
-              fs.listStatus(new Path(family, "mapfiles"));
-            if (familyStatus.length > 1) {
-              LOG.warn(family.toString() + " has " + familyStatus.length +
-              " files.  Continuing...");
+    // Presumes any directory under hbase.rootdir is a table.
+    FileStatus [] tableDirs =
+      fs.listStatus(FSUtils.getRootDir(conf), new DirFilter(fs));
+    for (int i = 0; i < tableDirs.length; i++) {
+      // Inside a table, there are compaction.dir directories to skip.
+      // Otherwise, all else should be regions.  Then in each region, should
+      // only be family directories.  Under each of these, should be a mapfile
+      // and info directory and in these only one file.
+      Path d = tableDirs[i].getPath();
+      if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
+      FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
+      for (int j = 0; j < regionDirs.length; j++) {
+        Path dd = regionDirs[j].getPath();
+        if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
+        // Else its a region name.  Now look in region for families.
+        FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
+        for (int k = 0; k < familyDirs.length; k++) {
+          Path family = familyDirs[k].getPath();
+          FileStatus [] infoAndMapfile = fs.listStatus(family);
+          // Assert that only info and mapfile in family dir.
+          if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
+            LOG.warn(family.toString() + " has more than just info and mapfile: " +
+              infoAndMapfile.length + ". Continuing...");
+            continue;
+          }
+          // Make sure directory named info or mapfile.
+          for (int ll = 0; ll < 2; ll++) {
+            if (infoAndMapfile[ll].getPath().getName().equals("info") ||
+                infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
               continue;
-            }
-            if (familyStatus.length == 1) {
-              // If we got here, then this is good.  Add the mapfile to out
-              String str = familyStatus[0].getPath().makeQualified(fs).toString();
-              LOG.info(str);
-              out.write(Bytes.toBytes(str + "\n"));
-              if (index++ % 100 == 0) {
-                if (index != 0) {
-                  out = getOut(fs, dir, index, out);
-                }
-              }
-            } else {
-              LOG.warn("Empty store " + family.toString());
-            }
+            LOG.warn("Unexpected directory name: " +
+                infoAndMapfile[ll].getPath() + ". Continuing...");
+            continue;
+          }
+          // Now in family, there are 'mapfile' and 'info' subdirs.  Just
+          // look in the 'mapfile' subdir.
+          Path mfsdir = new Path(family, "mapfiles");
+          FileStatus [] familyStatus = fs.listStatus(mfsdir);
+          if (familyStatus.length > 1) {
+            LOG.warn(family.toString() + " has " + familyStatus.length +
+            " files.  Continuing...");
+            continue;
+          }
+          if (familyStatus.length == 1) {
+            // If we got here, then this is good.  Add the mapfile to out
+            String str = familyStatus[0].getPath().makeQualified(fs).toString();
+            LOG.info(str);
+            out.write(Bytes.toBytes(str + "\n"));
+          } else {
+            // Special case.  Empty region.  Remove the mapfiles and info dirs.
+            Path infodir = new Path(family, "info");
+            LOG.info("Removing " + mfsdir + " and " + infodir + " because empty");
+            fs.delete(mfsdir, true);
+            fs.delete(infodir, true);
           }
         }
       }
-    } finally {
-      out.close();
     }
   }
-  
-  private static FSDataOutputStream getOut(final FileSystem fs, final Path dir,
-      final int index, FSDataOutputStream out)
-  throws IOException {
-    if (out == null) out.close();
-    return fs.create(new Path(dir, "mapfiles-" + index));
-  }
 
-  public static void main(String[] args) throws Exception {
-    HBaseConfiguration conf = new HBaseConfiguration();
-    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    if (otherArgs.length < 2) {
-      System.err.println("ERROR: Wrong number of parameters: " + args.length);
-      System.err.println("Usage: " + HStoreFileToStoreFile.class.getName() + 
-        " <inputdir> <output>");
-      System.exit(-1);
+  public int run(final String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("ERROR: Wrong number of arguments: " + args.length);
+      System.err.println("Usage: " + getClass().getSimpleName() +
+        " <inputdir> <outputdir>");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return -1;
     }
     Path input = new Path(args[0]);
+    HBaseConfiguration conf = (HBaseConfiguration)getConf();
     FileSystem fs = FileSystem.get(conf);
     writeInputFiles(conf, fs, input);
     Job job = new Job(conf);
     job.setJarByClass(HStoreFileToStoreFile.class);
     job.setJobName(JOBNAME);
+    job.setInputFormatClass(TextInputFormat.class);
     job.setMapperClass(Map.class);
     job.setNumReduceTasks(0);
-    TextInputFormat.setInputPaths(job, input);
+    FileInputFormat.addInputPath(job, input);
     Path output = new Path(args[1]);
     FileOutputFormat.setOutputPath(job, output);
-    System.exit(job.waitForCompletion(true) ? 0 : 1);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int exitCode = ToolRunner.run(new HBaseConfiguration(),
+      new HStoreFileToStoreFile(), args);
+    System.exit(exitCode);
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Migrate.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Migrate.java?rev=797230&r1=797229&r2=797230&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Migrate.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Migrate.java Thu Jul 23 21:30:56 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.migration.nineteen.io.BloomFilterMapFile;
@@ -86,7 +87,6 @@
  */
 public class Migrate extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(Migrate.class);
-  private final HBaseConfiguration conf;
   private FileSystem fs;
   boolean migrationNeeded = false;
   boolean check = false;
@@ -100,26 +100,24 @@
   private static final String MIGRATION_LINK = 
     " See http://wiki.apache.org/hadoop/Hbase/HowToMigrate for more information.";
 
-  /** default constructor */
-  public Migrate() {
-    this(new HBaseConfiguration());
-  }
-  
   /**
    * @param conf
    */
-  public Migrate(HBaseConfiguration conf) {
-    super(conf);
-    this.conf = conf;
+  public Migrate() {
+    super();
   }
-  
+
+  public Migrate(final HBaseConfiguration c) {
+    super(c);
+  }
+
   /*
    * Sets the hbase rootdir as fs.default.name.
    * @return True if succeeded.
    */
   private boolean setFsDefaultName() {
     // Validate root directory path
-    Path rd = new Path(conf.get(HConstants.HBASE_DIR));
+    Path rd = new Path(getConf().get(HConstants.HBASE_DIR));
     try {
       // Validate root directory path
       FSUtils.validateRootPath(rd);
@@ -129,7 +127,7 @@
           " configuration parameter '" + HConstants.HBASE_DIR + "'", e);
       return false;
     }
-    this.conf.set("fs.default.name", rd.toString());
+    getConf().set("fs.default.name", rd.toString());
     return true;
   }
 
@@ -139,7 +137,7 @@
   private boolean verifyFilesystem() {
     try {
       // Verify file system is up.
-      fs = FileSystem.get(conf);                        // get DFS handle
+      fs = FileSystem.get(getConf());                        // get DFS handle
       LOG.info("Verifying that file system is available..");
       FSUtils.checkFileSystemAvailable(fs);
       return true;
@@ -154,7 +152,7 @@
     LOG.info("Verifying that HBase is not running...." +
           "Trys ten times  to connect to running master");
     try {
-      HBaseAdmin.checkHBaseAvailable(conf);
+      HBaseAdmin.checkHBaseAvailable((HBaseConfiguration)getConf());
       LOG.fatal("HBase cluster must be off-line.");
       return false;
     } catch (MasterNotRunningException e) {
@@ -177,7 +175,8 @@
       LOG.info("Starting upgrade" + (check ? " check" : ""));
 
       // See if there is a file system version file
-      String versionStr = FSUtils.getVersion(fs, FSUtils.getRootDir(this.conf));
+      String versionStr = FSUtils.getVersion(fs,
+        FSUtils.getRootDir((HBaseConfiguration)getConf()));
       if (versionStr == null) {
         throw new IOException("File system version file " +
             HConstants.VERSION_FILE_NAME +
@@ -202,7 +201,7 @@
       if (!check) {
         // Set file system version
         LOG.info("Setting file system version.");
-        FSUtils.setVersion(fs, FSUtils.getRootDir(this.conf));
+        FSUtils.setVersion(fs, FSUtils.getRootDir((HBaseConfiguration)getConf()));
         LOG.info("Upgrade successful.");
       } else if (this.migrationNeeded) {
         LOG.info("Upgrade needed.");
@@ -220,7 +219,7 @@
       return;
     }
     // Before we start, make sure all is major compacted.
-    Path hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
+    Path hbaseRootDir = new Path(getConf().get(HConstants.HBASE_DIR));
     boolean pre020 = FSUtils.isPre020FileLayout(fs, hbaseRootDir);
     if (pre020) {
       LOG.info("Checking pre020 filesystem is major compacted");
@@ -244,7 +243,7 @@
         throw new IOException(msg);
       }
     }
-    final MetaUtils utils = new MetaUtils(this.conf);
+    final MetaUtils utils = new MetaUtils((HBaseConfiguration)getConf());
     final List<HRegionInfo> metas = new ArrayList<HRegionInfo>();
     try {
       // Rewrite root.
@@ -334,12 +333,20 @@
           if (mfs.length > 1) {
             throw new IOException("Should only be one directory in: " + mfdir);
           }
-          rewrite(this.conf, this.fs, mfs[0].getPath());
+          if (mfs.length == 0) {
+            // Special case.  Empty region.  Remove the mapfiles and info dirs.
+            Path infodir = new Path(family, "info");
+            LOG.info("Removing " + mfdir + " and " + infodir + " because empty");
+            fs.delete(mfdir, true);
+            fs.delete(infodir, true);
+          } else {
+            rewrite((HBaseConfiguration)getConf(), this.fs, mfs[0].getPath());
+          }
         }
       }
     }
   }
-  
+
   /**
    * Rewrite the passed 0.19 mapfile as a 0.20 file.
    * @param fs
@@ -359,11 +366,12 @@
       Integer.parseInt(regiondir.getName()),
       Bytes.toBytes(familydir.getName()), Long.parseLong(mf.getName()), null);
     BloomFilterMapFile.Reader src = hsf.getReader(fs, false, false);
-    HFile.Writer tgt = StoreFile.getWriter(fs, familydir);
+    HFile.Writer tgt = StoreFile.getWriter(fs, familydir,
+      conf.getInt("hfile.min.blocksize.size", 64*1024),
+      Compression.Algorithm.NONE, getComparator(basedir));
     // From old 0.19 HLogEdit.
     ImmutableBytesWritable deleteBytes =
       new ImmutableBytesWritable("HBASE::DELETEVAL".getBytes("UTF-8"));
-
     try {
       while (true) {
         HStoreKey key = new HStoreKey();
@@ -398,6 +406,13 @@
     }
   }
 
+  private static KeyValue.KeyComparator getComparator(final Path tabledir) {
+    String tablename = tabledir.getName();
+    return tablename.equals("-ROOT-")? KeyValue.META_KEY_COMPARATOR:
+      tablename.equals(".META.")? KeyValue.META_KEY_COMPARATOR:
+        KeyValue.KEY_COMPARATOR;
+  }
+
   /*
    * Enable blockcaching on catalog tables.
    * @param mr
@@ -523,11 +538,11 @@
   public static void main(String[] args) {
     int status = 0;
     try {
-      status = ToolRunner.run(new Migrate(), args);
+      status = ToolRunner.run(new HBaseConfiguration(), new Migrate(), args);
     } catch (Exception e) {
       LOG.error(e);
       status = -1;
     }
     System.exit(status);
   }
-}
+}
\ No newline at end of file