You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/07/11 02:09:32 UTC

svn commit: r1359959 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/zookeeper/ test/java/org...

Author: tedyu
Date: Wed Jul 11 00:09:31 2012
New Revision: 1359959

URL: http://svn.apache.org/viewvc?rev=1359959&view=rev
Log:
HBASE-6337 [MTTR] Remove renaming tmp log file in SplitLogManager (Chunhui)


Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Jul 11 00:09:31 2012
@@ -133,10 +133,8 @@ public class SplitLogManager extends Zoo
     this(zkw, conf, stopper, serverName, new TaskFinisher() {
       @Override
       public Status finish(String workerName, String logfile) {
-        String tmpname =
-          ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile);
         try {
-          HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
+          HLogSplitter.finishSplitLogFile(logfile, conf);
         } catch (IOException e) {
           LOG.warn("Could not finish splitting of log file " + logfile);
           return Status.ERR;

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Jul 11 00:09:31 2012
@@ -108,9 +108,7 @@ public class SplitLogWorker extends ZooK
         // interrupted or has encountered a transient error and when it has
         // encountered a bad non-retry-able persistent error.
         try {
-          String tmpname =
-            ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
-          if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
+          if (HLogSplitter.splitLogFile(rootdir,
               fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
             return Status.PREEMPTED;
           }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Jul 11 00:09:31 2012
@@ -326,16 +326,13 @@ public class HLogSplitter {
   }
 
   /**
-   * Splits a HLog file into a temporary staging area. tmpname is used to build
-   * the name of the staging area where the recovered-edits will be separated
-   * out by region and stored.
+   * Splits a HLog file into region's recovered-edits directory
    * <p>
    * If the log file has N regions then N recovered.edits files will be
    * produced. There is no buffering in this code. Instead it relies on the
    * buffering in the SequenceFileWriter.
    * <p>
    * @param rootDir
-   * @param tmpname
    * @param logfile
    * @param fs
    * @param conf
@@ -343,16 +340,16 @@ public class HLogSplitter {
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
-  static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
-      FileStatus logfile, FileSystem fs,
-      Configuration conf, CancelableProgressable reporter) throws IOException {
+  static public boolean splitLogFile(Path rootDir, FileStatus logfile,
+      FileSystem fs, Configuration conf, CancelableProgressable reporter)
+      throws IOException {
     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
         fs);
-    return s.splitLogFileToTemp(logfile, tmpname, reporter);
+    return s.splitLogFile(logfile, reporter);
   }
 
-  public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
-      CancelableProgressable reporter)  throws IOException {	    
+  public boolean splitLogFile(FileStatus logfile,
+      CancelableProgressable reporter) throws IOException {
     final Map<byte[], Object> logWriters = Collections.
     synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
     boolean isCorrupted = false;
@@ -384,7 +381,7 @@ public class HLogSplitter {
       in = getReader(fs, logfile, conf, skipErrors);
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not get reader, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
       isCorrupted = true;
     }
     if (in == null) {
@@ -412,7 +409,7 @@ public class HLogSplitter {
         }
         WriterAndPath wap = (WriterAndPath)o;
         if (wap == null) {
-          wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
+          wap = createWAP(region, entry, rootDir, fs, conf);
           numNewlyOpenedFiles++;
           if (wap == null) {
             // ignore edits from this region. It doesn't exist anymore.
@@ -447,7 +444,7 @@ public class HLogSplitter {
       }
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not parse, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
       isCorrupted = true;
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
@@ -524,94 +521,40 @@ public class HLogSplitter {
   }
 
   /**
-   * Completes the work done by splitLogFileToTemp by moving the
-   * recovered.edits from the staging area to the respective region server's
-   * directories.
+   * Completes the work done by splitLogFile by archiving logs
    * <p>
    * It is invoked by SplitLogManager once it knows that one of the
-   * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
-   * master crashes then this function might get called multiple times.
+   * SplitLogWorkers have completed the splitLogFile() part. If the master
+   * crashes then this function might get called multiple times.
    * <p>
-   * @param tmpname
+   * @param logfile
    * @param conf
    * @throws IOException
    */
-  public static void moveRecoveredEditsFromTemp(String tmpname,
-      String logfile, Configuration conf)
-  throws IOException{
+  public static void finishSplitLogFile(String logfile, Configuration conf)
+      throws IOException {
     Path rootdir = FSUtils.getRootDir(conf);
     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
-    moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+    finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
   }
 
-  public static void moveRecoveredEditsFromTemp(String tmpname,
-      Path rootdir, Path oldLogDir,
-      String logfile, Configuration conf)
-  throws IOException{
+  public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+      String logfile, Configuration conf) throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
     FileSystem fs;
     fs = rootdir.getFileSystem(conf);
     Path logPath = new Path(logfile);
-    if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+    if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
       corruptedLogs.add(logPath);
     } else {
       processedLogs.add(logPath);
     }
-    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
-    List<FileStatus> files = listAll(fs, stagingDir);
-    for (FileStatus f : files) {
-      Path src = f.getPath();
-      Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
-      if (ZKSplitLog.isCorruptFlagFile(dst)) {
-        continue;
-      }
-      if (fs.exists(src)) {
-        if (fs.exists(dst)) {
-          fs.delete(dst, false);
-        } else {
-          Path regionDir = dst.getParent().getParent();
-          if (!fs.exists(regionDir)) {
-            // See HBASE-6050.
-            LOG.warn("Could not move recovered edits from " + src
-                + " to destination " + regionDir + " as it doesn't exist.");
-            continue;
-          }
-          Path dstdir = dst.getParent();
-          if (!fs.exists(dstdir)) {
-            if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
-          }
-        }
-        fs.rename(src, dst);
-        LOG.debug(" moved " + src + " => " + dst);
-      } else {
-        LOG.debug("Could not move recovered edits from " + src +
-            " as it doesn't exist");
-      }
-    }
-    archiveLogs(null, corruptedLogs, processedLogs,
-        oldLogDir, fs, conf);
+    archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
     fs.delete(stagingDir, true);
-    return;
   }
 
-  private static List<FileStatus> listAll(FileSystem fs, Path dir)
-  throws IOException {
-    List<FileStatus> fset = new ArrayList<FileStatus>(100);
-    FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null;
-    if (files != null) {
-      for (FileStatus f : files) {
-        if (f.isDir()) {
-          fset.addAll(listAll(fs, f.getPath()));
-        } else {
-          fset.add(f);
-        }
-      }
-    }
-    return fset;
-  }
-
-
   /**
    * Moves processed logs to a oldLogDir after successful processing Moves
    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
@@ -1102,15 +1045,14 @@ public class HLogSplitter {
     }
   }
 
-  private WriterAndPath createWAP(byte[] region, Entry entry,
-      Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+  private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
+      FileSystem fs, Configuration conf)
   throws IOException {
-    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir,
-        tmpname==null);
+    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
     if (regionedits == null) {
       return null;
     }
-    if ((tmpname == null) && fs.exists(regionedits)) {
+    if (fs.exists(regionedits)) {
       LOG.warn("Found existing old edits file. It could be the "
           + "result of a previous failed split attempt. Deleting "
           + regionedits + ", length="
@@ -1119,18 +1061,10 @@ public class HLogSplitter {
         LOG.warn("Failed delete of old " + regionedits);
       }
     }
-    Path editsfile;
-    if (tmpname != null) {
-      // During distributed log splitting the output by each
-      // SplitLogWorker is written to a temporary area.
-      editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
-    } else {
-      editsfile = regionedits;
-    }
-    Writer w = createWriter(fs, editsfile, conf);
-    LOG.debug("Creating writer path=" + editsfile + " region="
+    Writer w = createWriter(fs, regionedits, conf);
+    LOG.debug("Creating writer path=" + regionedits + " region="
         + Bytes.toStringBinary(region));
-    return (new WriterAndPath(editsfile, w));
+    return (new WriterAndPath(regionedits, w));
   }
 
   Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
@@ -1318,7 +1252,7 @@ public class HLogSplitter {
       if (blacklistedRegions.contains(region)) {
         return null;
       }
-      ret = createWAP(region, entry, rootDir, null, fs, conf);
+      ret = createWAP(region, entry, rootDir, fs, conf);
       if (ret == null) {
         blacklistedRegions.add(region);
         return null;

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Wed Jul 11 00:09:31 2012
@@ -156,27 +156,13 @@ public class ZKSplitLog {
     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
   }
 
-  public static Path stripSplitLogTempDir(Path rootdir, Path file) {
-    int skipDepth = rootdir.depth() + 2;
-    List<String> components = new ArrayList<String>(10);
-    do {
-      components.add(file.getName());
-      file = file.getParent();
-    } while (file.depth() > skipDepth);
-    Path ret = rootdir;
-    for (int i = components.size() - 1; i >= 0; i--) {
-      ret = new Path(ret, components.get(i));
-    }
-    return ret;
-  }
-
   public static String getSplitLogDirTmpComponent(String worker, String file) {
     return (worker + "_" + ZKSplitLog.encode(file));
   }
 
-  public static void markCorrupted(Path rootdir, String tmpname,
+  public static void markCorrupted(Path rootdir, String logFileName,
       FileSystem fs) {
-    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
     try {
       fs.createNewFile(file);
     } catch (IOException e) {
@@ -185,18 +171,14 @@ public class ZKSplitLog {
     }
   }
 
-  public static boolean isCorrupted(Path rootdir, String tmpname,
+  public static boolean isCorrupted(Path rootdir, String logFileName,
       FileSystem fs) throws IOException {
-    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
     boolean isCorrupt;
     isCorrupt = fs.exists(file);
     return isCorrupt;
   }
 
-  public static boolean isCorruptFlagFile(Path file) {
-    return file.getName().equals("corrupt");
-  }
-
 
   public static class Counters {
     //SplitLogManager counters

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Jul 11 00:09:31 2012
@@ -1016,10 +1016,9 @@ public class TestHLogSplit {
     generateHLogs(1, 10, -1);
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
 
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
@@ -1046,10 +1045,9 @@ public class TestHLogSplit {
     LOG.info("Region directory is" + regiondir);
     fs.delete(regiondir, true);
     
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     
     assertTrue(!fs.exists(regiondir));
     assertTrue(true);
@@ -1065,10 +1063,9 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
     assertFalse(fs.exists(tdir));
 
@@ -1082,10 +1079,9 @@ public class TestHLogSplit {
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     for (String region : regions) {
       Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
       assertEquals(10, countHLog(recovered, fs, conf));
@@ -1103,10 +1099,9 @@ public class TestHLogSplit {
         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
 
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
         "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1359959&r1=1359958&r2=1359959&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Jul 11 00:09:31 2012
@@ -563,10 +563,10 @@ public class TestWALReplay {
     wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
-    HLogSplitter.splitLogFileToTemp(hbaseRootDir, hbaseRootDir + "/temp", listStatus[0], this.fs,
-        this.conf, null);
-    FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/temp/" + tableNameStr
-        + "/" + hri.getEncodedName() + "/recovered.edits"));
+    HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
+        null);
+    FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
+        + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
     int editCount = 0;
     for (FileStatus fileStatus : listStatus1) {
       editCount = Integer.parseInt(fileStatus.getPath().getName());