You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/07/11 02:07:53 UTC

svn commit: r1359957 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/zookeeper/ test/jav...

Author: tedyu
Date: Wed Jul 11 00:07:53 2012
New Revision: 1359957

URL: http://svn.apache.org/viewvc?rev=1359957&view=rev
Log:
HBASE-6337 [MTTR] Remove renaming tmp log file in SplitLogManager (Chunhui)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Jul 11 00:07:53 2012
@@ -139,10 +139,8 @@ public class SplitLogManager extends Zoo
     this(zkw, conf, stopper, serverName, new TaskFinisher() {
       @Override
       public Status finish(ServerName workerName, String logfile) {
-        String tmpname =
-          ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile);
         try {
-          HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
+          HLogSplitter.finishSplitLogFile(logfile, conf);
         } catch (IOException e) {
           LOG.warn("Could not finish splitting of log file " + logfile);
           return Status.ERR;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Jul 11 00:07:53 2012
@@ -108,9 +108,7 @@ public class SplitLogWorker extends ZooK
         // interrupted or has encountered a transient error and when it has
         // encountered a bad non-retry-able persistent error.
         try {
-          String tmpname =
-            ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename);
-          if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
+          if (HLogSplitter.splitLogFile(rootdir,
               fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
             return Status.PREEMPTED;
           }
@@ -579,4 +577,4 @@ public class SplitLogWorker extends ZooK
     }
     public Status exec(String name, CancelableProgressable p);
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Jul 11 00:07:53 2012
@@ -346,15 +346,12 @@ public class HLogSplitter {
   }
 
   /**
-   * Splits a HLog file into a temporary staging area. tmpname is used to build
-   * the name of the staging area where the recovered-edits will be separated
-   * out by region and stored.
+   * Splits a HLog file into region's recovered-edits directory
    * <p>
    * If the log file has N regions then N recovered.edits files will be
    * produced.
    * <p>
    * @param rootDir
-   * @param tmpname
    * @param logfile
    * @param fs
    * @param conf
@@ -362,16 +359,15 @@ public class HLogSplitter {
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
-  static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
-      FileStatus logfile, FileSystem fs,
-      Configuration conf, CancelableProgressable reporter) throws IOException {
+  static public boolean splitLogFile(Path rootDir, FileStatus logfile,
+      FileSystem fs, Configuration conf, CancelableProgressable reporter)
+      throws IOException {
     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs);
-    return s.splitLogFileToTemp(logfile, tmpname, reporter);
+    return s.splitLogFile(logfile, reporter);
   }
 
-  public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
-      CancelableProgressable reporter)
-  throws IOException {
+  public boolean splitLogFile(FileStatus logfile,
+      CancelableProgressable reporter) throws IOException {
     boolean isCorrupted = false;
     Preconditions.checkState(status == null);
     status = TaskMonitor.get().createStatus(
@@ -389,7 +385,7 @@ public class HLogSplitter {
       in = getReader(fs, logfile, conf, skipErrors);
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not get reader, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
       isCorrupted = true;
     }
     if (in == null) {
@@ -397,8 +393,7 @@ public class HLogSplitter {
       LOG.warn("Nothing to split in log file " + logPath);
       return true;
     }
-    this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(
-        reporter, tmpname));
+    this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
     if (!reportProgressIfIsDistributedLogSplitting()) {
       return false;
     }
@@ -431,7 +426,7 @@ public class HLogSplitter {
       throw iie;
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not parse, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
       isCorrupted = true;
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
@@ -451,94 +446,40 @@ public class HLogSplitter {
   }
 
   /**
-   * Completes the work done by splitLogFileToTemp by moving the
-   * recovered.edits from the staging area to the respective region server's
-   * directories.
+   * Completes the work done by splitLogFile by archiving logs
    * <p>
    * It is invoked by SplitLogManager once it knows that one of the
-   * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
-   * master crashes then this function might get called multiple times.
+   * SplitLogWorkers have completed the splitLogFile() part. If the master
+   * crashes then this function might get called multiple times.
    * <p>
-   * @param tmpname
+   * @param logfile
    * @param conf
    * @throws IOException
    */
-  public static void moveRecoveredEditsFromTemp(String tmpname,
-      String logfile, Configuration conf)
-  throws IOException{
+  public static void finishSplitLogFile(String logfile, Configuration conf)
+      throws IOException {
     Path rootdir = FSUtils.getRootDir(conf);
     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
-    moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+    finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
   }
 
-  public static void moveRecoveredEditsFromTemp(String tmpname,
-      Path rootdir, Path oldLogDir,
-      String logfile, Configuration conf)
-  throws IOException{
+  public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+      String logfile, Configuration conf) throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
     FileSystem fs;
     fs = rootdir.getFileSystem(conf);
     Path logPath = new Path(logfile);
-    if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+    if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
       corruptedLogs.add(logPath);
     } else {
       processedLogs.add(logPath);
     }
-    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
-    List<FileStatus> files = listAll(fs, stagingDir);
-    for (FileStatus f : files) {
-      Path src = f.getPath();
-      Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
-      if (ZKSplitLog.isCorruptFlagFile(dst)) {
-        continue;
-      }
-      if (fs.exists(src)) {
-        if (fs.exists(dst)) {
-          fs.delete(dst, false);
-        } else {
-          Path regionDir = dst.getParent().getParent();
-          if (!fs.exists(regionDir)) {
-            // See HBASE-6050.
-            LOG.warn("Could not move recovered edits from " + src +
-            " to destination " + regionDir + " as it doesn't exist.");
-            continue;
-          }
-          Path dstdir = dst.getParent();
-          if (!fs.exists(dstdir)) {
-            if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
-          }
-        }
-        fs.rename(src, dst);
-        LOG.debug(" moved " + src + " => " + dst);
-      } else {
-        LOG.debug("Could not move recovered edits from " + src +
-            " as it doesn't exist");
-      }
-    }
-    archiveLogs(null, corruptedLogs, processedLogs,
-        oldLogDir, fs, conf);
+    archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
     fs.delete(stagingDir, true);
-    return;
   }
 
-  private static List<FileStatus> listAll(FileSystem fs, Path dir)
-  throws IOException {
-    List<FileStatus> fset = new ArrayList<FileStatus>(100);
-    FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null;
-    if (files != null) {
-      for (FileStatus f : files) {
-        if (f.isDir()) {
-          fset.addAll(listAll(fs, f.getPath()));
-        } else {
-          fset.add(f);
-        }
-      }
-    }
-    return fset;
-  }
-
-
   /**
    * Moves processed logs to a oldLogDir after successful processing Moves
    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
@@ -1027,14 +968,14 @@ public class HLogSplitter {
     }
   }
 
-  private WriterAndPath createWAP(byte[] region, Entry entry,
-      Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+  private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
+      FileSystem fs, Configuration conf)
   throws IOException {
-    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, tmpname==null);
+    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
     if (regionedits == null) {
       return null;
     }
-    if ((tmpname == null) && fs.exists(regionedits)) {
+    if (fs.exists(regionedits)) {
       LOG.warn("Found existing old edits file. It could be the "
           + "result of a previous failed split attempt. Deleting "
           + regionedits + ", length="
@@ -1043,18 +984,10 @@ public class HLogSplitter {
         LOG.warn("Failed delete of old " + regionedits);
       }
     }
-    Path editsfile;
-    if (tmpname != null) {
-      // During distributed log splitting the output by each
-      // SplitLogWorker is written to a temporary area.
-      editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
-    } else {
-      editsfile = regionedits;
-    }
-    Writer w = createWriter(fs, editsfile, conf);
-    LOG.debug("Creating writer path=" + editsfile + " region="
+    Writer w = createWriter(fs, regionedits, conf);
+    LOG.debug("Creating writer path=" + regionedits + " region="
         + Bytes.toStringBinary(region));
-    return (new WriterAndPath(editsfile, w));
+    return (new WriterAndPath(regionedits, w));
   }
 
   Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
@@ -1110,12 +1043,9 @@ public class HLogSplitter {
     // How often to send a progress report (default 1/2 master timeout)
     private final int report_period;
     private long last_report_at = 0;
-    private final String tmpDirName;
 
-    public DistributedLogSplittingHelper(CancelableProgressable reporter,
-        String tmpName) {
+    public DistributedLogSplittingHelper(CancelableProgressable reporter) {
       this.splitReporter = reporter;
-      this.tmpDirName = tmpName;
       report_period = conf.getInt("hbase.splitlog.report.period",
           conf.getInt("hbase.splitlog.manager.timeout",
               SplitLogManager.DEFAULT_TIMEOUT) / 2);
@@ -1139,10 +1069,6 @@ public class HLogSplitter {
         return true;
       }
     }
-
-    String getTmpDirName() {
-      return this.tmpDirName;
-    }
   }
 
   /**
@@ -1380,9 +1306,7 @@ public class HLogSplitter {
       if (blacklistedRegions.contains(region)) {
         return null;
       }
-      String tmpName = distributedLogSplittingHelper == null ? null
-          : distributedLogSplittingHelper.getTmpDirName();
-      ret = createWAP(region, entry, rootDir, tmpName, fs, conf);
+      ret = createWAP(region, entry, rootDir, fs, conf);
       if (ret == null) {
         blacklistedRegions.add(region);
         return null;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Wed Jul 11 00:07:53 2012
@@ -98,27 +98,14 @@ public class ZKSplitLog {
     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
   }
 
-  public static Path stripSplitLogTempDir(Path rootdir, Path file) {
-    int skipDepth = rootdir.depth() + 2;
-    List<String> components = new ArrayList<String>(10);
-    do {
-      components.add(file.getName());
-      file = file.getParent();
-    } while (file.depth() > skipDepth);
-    Path ret = rootdir;
-    for (int i = components.size() - 1; i >= 0; i--) {
-      ret = new Path(ret, components.get(i));
-    }
-    return ret;
-  }
 
   public static String getSplitLogDirTmpComponent(final String worker, String file) {
     return worker + "_" + ZKSplitLog.encode(file);
   }
 
-  public static void markCorrupted(Path rootdir, String tmpname,
+  public static void markCorrupted(Path rootdir, String logFileName,
       FileSystem fs) {
-    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
     try {
       fs.createNewFile(file);
     } catch (IOException e) {
@@ -127,15 +114,12 @@ public class ZKSplitLog {
     }
   }
 
-  public static boolean isCorrupted(Path rootdir, String tmpname,
+  public static boolean isCorrupted(Path rootdir, String logFileName,
       FileSystem fs) throws IOException {
-    Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
     boolean isCorrupt;
     isCorrupt = fs.exists(file);
     return isCorrupt;
   }
 
-  public static boolean isCorruptFlagFile(Path file) {
-    return file.getName().equals("corrupt");
-  }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Jul 11 00:07:53 2012
@@ -1016,10 +1016,9 @@ public class TestHLogSplit {
     generateHLogs(1, 10, -1);
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
 
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
@@ -1046,10 +1045,9 @@ public class TestHLogSplit {
     LOG.info("Region directory is" + regiondir);
     fs.delete(regiondir, true);
     
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     
     assertTrue(!fs.exists(regiondir));
     assertTrue(true);
@@ -1065,10 +1063,9 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
     assertFalse(fs.exists(tdir));
 
@@ -1082,10 +1079,9 @@ public class TestHLogSplit {
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
     for (String region : regions) {
       Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
       assertEquals(10, countHLog(recovered, fs, conf));
@@ -1103,10 +1099,9 @@ public class TestHLogSplit {
         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
-        conf, reporter);
-    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
-        logfile.getPath().toString(), conf);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+        .toString(), conf);
 
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
         "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Jul 11 00:07:53 2012
@@ -660,10 +660,10 @@ public class TestWALReplay {
     wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
-    HLogSplitter.splitLogFileToTemp(hbaseRootDir, hbaseRootDir + "/temp", listStatus[0], this.fs,
-        this.conf, null);
-    FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/temp/" + tableNameStr
-        + "/" + hri.getEncodedName() + "/recovered.edits"));
+    HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
+        null);
+    FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
+        + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
     int editCount = 0;
     for (FileStatus fileStatus : listStatus1) {
       editCount = Integer.parseInt(fileStatus.getPath().getName());