You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/12/17 20:19:44 UTC

svn commit: r1423101 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/ap...

Author: liyin
Date: Mon Dec 17 19:19:43 2012
New Revision: 1423101

URL: http://svn.apache.org/viewvc?rev=1423101&view=rev
Log:
[HBASE-7366] Make old logs directory have a subdirectory structure

Author: amirshim

Summary:
To speed up replication, it is useful to have each regionserver's old logs in a separate directory, so we don't have to do LS on a 300,000 file directory (namenode doesn't like that).  So we're changing the oldlogs directory structure to contain subdirectories of the format: "10.159.11.45%3A60020", for example:
/TITAN029-ASH3-HBASE/.oldlogs/10.159.11.45%3A60020/10.159.11.45%3A60020.1355452542203

Test Plan:
Unit tests - passed.

Run on cluster, and make sure that files are being moved to the correct subdirectories, and being deleted after the retention time has elapsed.

Reviewers: liyintang, nspiegelberg, mycnyc

Reviewed By: liyintang

CC: hbase-eng@, davejwatson

Differential Revision: https://phabricator.fb.com/D659154

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Mon Dec 17 19:19:43 2012
@@ -220,6 +220,17 @@ public final class HConstants {
   /** Like the previous, but for old logs that are about to be deleted */
   public static final String HREGION_OLDLOGDIR_NAME = ".oldlogs";
 
+  /** Boolean config to determine if we should use a subdir structure
+   * in the .oldlogs directory */
+  public static final String HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE =
+    "hbase.regionserver.oldlogs.use.subdir.structure";
+
+  /** Boolean config to determine if we should use a subdir structure in
+   * the .oldlogs directory by default */
+  public static final boolean HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT =
+    true;
+
+
   /** Used to construct the name of the compaction directory during compaction */
   public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir";
 
@@ -475,7 +486,7 @@ public final class HConstants {
      * scanner's next method.
      */
   public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size";
-  
+
   /**
    * Maximum number of bytes returned when calling a scanner's next method.
    * Note that when a single row is larger than this limit the row is still
@@ -488,7 +499,7 @@ public final class HConstants {
 
   /**
    * Maximum number of bytes returned when calling a scanner's next method.
-   * Used with partialRow parameter on the client side.  Note that when a 
+   * Used with partialRow parameter on the client side.  Note that when a
    * single row is larger than this limit, the row is still returned completely
    * if partialRow is true, otherwise, the row will be truncated in order to
    * fit the memory.
@@ -596,11 +607,11 @@ public final class HConstants {
 
   /** The number of HLogs for each region server */
   public static final String HLOG_CNT_PER_SERVER = "hbase.regionserver.hlog.cnt.perserver";
-  
+
   public static final String HLOG_FORMAT_BACKWARD_COMPATIBILITY =
       "hbase.regionserver.hlog.format.backward.compatibility";
-  
-  /** 
+
+  /**
    * The byte array represents for NO_NEXT_INDEXED_KEY;
    * The actual value is irrelevant because this is always compared by reference.
    */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java Mon Dec 17 19:19:43 2012
@@ -91,32 +91,48 @@ public class OldLogsCleaner extends Chor
     }
   }
 
-  @Override
-  protected void chore() {
-    try {
-      FileStatus[] files = this.fs.listStatus(this.oldLogDir);
-      if (files == null) {
-        // We don't have any files to process.
-        return;
+  /**
+   * Delete log files directories recursively.
+   * @param files The list of files/directories to traverse.
+   * @param deleteCountLeft Max number of files to delete
+   * @param maxDepth Max Directory depth to recurse
+   * @return Number of files left to delete (deleteCountLeft - number deleted)
+   * @throws IOException
+   */
+  private int cleanFiles(FileStatus[] files, int deleteCountLeft,
+                         int maxDepth) throws IOException {
+    if (files == null || files.length == 0) return deleteCountLeft;
+    if (maxDepth <= 0) {
+      LOG.warn("Old Logs directory structure is too deep: " + files[0].getPath());
+      return deleteCountLeft;
+    }
+    for (FileStatus file : files) {
+      if (deleteCountLeft <= 0) return 0; // we don't have anymore to delete
+      if (file.isDir()) {
+        deleteCountLeft = cleanFiles(this.fs.listStatus(file.getPath()),
+                                     deleteCountLeft, maxDepth - 1);
+        continue;
       }
-      int nbDeletedLog = 0;
-      for (FileStatus file : files) {
-        Path filePath = file.getPath();
-        if (HLog.validateHLogFilename(filePath.getName())) {
-          if (logCleaner.isLogDeletable(filePath) ) {
-            this.fs.delete(filePath, true);
-            nbDeletedLog++;
-          }
-        } else {
-          LOG.warn("Found a wrongly formated file: "
-              + file.getPath().getName());
+      Path filePath = file.getPath();
+      if (HLog.validateHLogFilename(filePath.getName())) {
+        if (logCleaner.isLogDeletable(filePath) ) {
           this.fs.delete(filePath, true);
-          nbDeletedLog++;
-        }
-        if (nbDeletedLog >= maxDeletedLogs) {
-          break;
+          deleteCountLeft--;
         }
+      } else {
+        LOG.warn("Found a wrongly formatted file: "
+            + file.getPath().getName());
+        this.fs.delete(filePath, true);
+        deleteCountLeft--;
       }
+    }
+    return deleteCountLeft;
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      cleanFiles(this.fs.listStatus(this.oldLogDir), maxDeletedLogs, 2);
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
       LOG.warn("Error while cleaning the logs", e);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Dec 17 19:19:43 2012
@@ -242,7 +242,7 @@ public class HLog implements Syncable {
 
   // Lock to guarantee the ordering of log entries in HLOG
   private final Object appendLock = new Object();
-  
+
   private final boolean enabled;
 
   /*
@@ -330,25 +330,25 @@ public class HLog implements Syncable {
   /**
    * Double list buffer for WAL that allows entries to be
    * appended while sync is in progress
-   * 
+   *
    * CurrentList is for buffering appended entries;
    * syncList contains entries being synced to persistent storage;
    */
   private class DoubleListBuffer {
     private LinkedList<Entry> currentList = new LinkedList<Entry>();
     private LinkedList<Entry> syncList = new LinkedList<Entry>();
-    
+
     /**
-     * Append a log entry into the buffer 
+     * Append a log entry into the buffer
      * @param entry log entry
      */
     synchronized private void appendToBuffer(Entry entry) {
       currentList.add(entry);
     }
-    
+
     /**
      * Sync buffered log entries into persistent storage
-     * 
+     *
      * @return number of log entries synced
      */
     private int appendAndSync() throws IOException {
@@ -363,14 +363,14 @@ public class HLog implements Syncable {
         syncList = currentList;
         currentList = tmp;
       }
-      
+
       // append entries to writer
       int syncedEntries = syncList.size();
       while (!syncList.isEmpty()) {
         Entry entry = syncList.remove();
         writer.append(entry);
       }
-      
+
       // sync the data
       long now = System.currentTimeMillis();
       writer.sync();
@@ -378,7 +378,7 @@ public class HLog implements Syncable {
       return syncedEntries;
     }
   }
-  
+
   private DoubleListBuffer logBuffer = new DoubleListBuffer();
 
   /**
@@ -437,17 +437,17 @@ public class HLog implements Syncable {
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
-    
+
     if (!fs.exists(oldLogDir)) {
       fs.mkdirs(oldLogDir);
     }
     this.oldLogDir = oldLogDir;
-    
+
     if (!fs.exists(dir)) {
       fs.mkdirs(dir);
     }
     this.dir = dir;
-    
+
     this.hlogIndexID = hlogIndexID;
     this.hlogName = "HLog-" + this.hlogIndexID + " ";
 
@@ -461,14 +461,14 @@ public class HLog implements Syncable {
     if (actionListener != null) {
       addLogActionsListerner(actionListener);
     }
-    
+
     // If prefix is null||empty, then just name it hlog.
     if (conf.getBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, true)) {
       this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8");
       LOG.warn("Still using old hlog prefix due to HLOG_FORMAT_BACK_COMPATIBILITY: " + this.prefix);
     } else {
       // Also append the current hlogIndexId-totalHLogCnt to the prefix.
-      this.prefix = (prefix == null || prefix.isEmpty() ? 
+      this.prefix = (prefix == null || prefix.isEmpty() ?
           "hlog" : URLEncoder.encode(prefix, "UTF8"))
           + "." + hlogIndexID + "-" + totalHLogCnt;
       LOG.info("HLog prefix is " + this.prefix);
@@ -495,7 +495,7 @@ public class HLog implements Syncable {
     } else {
       LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
     }
-    
+
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
     Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
         Thread.currentThread().getName() + ".logSyncer-" + hlogIndexID);
@@ -909,7 +909,7 @@ public class HLog implements Syncable {
   }
 
   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
-    Path newPath = getHLogArchivePath(this.oldLogDir, p);
+    Path newPath = getHLogArchivePath(this.oldLogDir, p, this.fs, this.conf);
     LOG.info(hlogName + "moving old hlog file " + FSUtils.getPath(p) +
       " whose highest sequence/edit id is " + seqno + " to " +
       FSUtils.getPath(newPath));
@@ -951,11 +951,10 @@ public class HLog implements Syncable {
     close();
     FileStatus[] files = fs.listStatus(this.dir);
     for(FileStatus file : files) {
-      fs.rename(file.getPath(),
-          getHLogArchivePath(this.oldLogDir, file.getPath()));
+      Path newPath = getHLogArchivePath(this.oldLogDir, file.getPath(), fs, this.conf);
+      fs.rename(file.getPath(), newPath);
+      LOG.debug(hlogName + "Moved log file " + file + " to " + newPath);
     }
-    LOG.debug(hlogName + "Moved " + files.length + " log files to " +
-        FSUtils.getPath(this.oldLogDir));
     fs.delete(dir, true);
   }
 
@@ -978,11 +977,11 @@ public class HLog implements Syncable {
     } catch (InterruptedException e) {
       LOG.error(hlogName + "Exception while waiting for syncer thread to die", e);
     }
-    
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("closing hlog writer in " + this.dir.toString());
     }
-    
+
     cacheFlushLock.writeLock().lock();
     try {
       synchronized (updateLock) {
@@ -1040,7 +1039,7 @@ public class HLog implements Syncable {
 
     long len = edits.getTotalKeyValueLength();
     long txid = 0;
-    
+
     long start = System.currentTimeMillis();
     byte[] regionName = info.getRegionName();
 
@@ -1053,20 +1052,20 @@ public class HLog implements Syncable {
       long seqNum = obtainSeqNum();
       this.firstSeqWrittenInCurrentMemstore.putIfAbsent(regionName, seqNum);
       HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
-      
+
       doWrite(info, logKey, edits);
       // Only count 1 row as an unflushed entry.
       txid = this.unflushedEntries.incrementAndGet();
     }
-    
-    // Update the metrics 
+
+    // Update the metrics
     this.numEntries.incrementAndGet();
     writeSize.inc(len);
 
     // sync txn to file system
     start = System.currentTimeMillis();
     this.sync(info.isMetaRegion(), txid);
-    
+
     // Update the metrics and log down the outliers
     long end = System.currentTimeMillis();
     long syncTime = end - start;
@@ -1077,7 +1076,7 @@ public class HLog implements Syncable {
         Thread.currentThread().getName(), syncTime, this.numEntries.get(),
         StringUtils.humanReadableInt(len)));
     }
-    
+
     // Update the per-request profiling data
     Call call = HRegionServer.callContext.get();
     ProfilingData pData = call == null ? null : call.getProfilingData();
@@ -1165,7 +1164,7 @@ public class HLog implements Syncable {
       } catch (InterruptedException e) {
         LOG.debug(getName() + " interrupted while waiting for sync requests");
         if (unflushedEntries.get() != syncTillHere) {
-          syncFailureAbortStrategy.abort("LogSyncer interrupted before it" + 
+          syncFailureAbortStrategy.abort("LogSyncer interrupted before it" +
               " could sync everything. Aborting JVM", e);
         }
       } finally {
@@ -1248,7 +1247,7 @@ public class HLog implements Syncable {
           syncFailureAbortStrategy.abort(hlogName + "Could not sync hlog. Aborting", e);
         }
       }
-      
+
       // if the number of replicas in HDFS has fallen below the initial
       // value, then roll logs.
       try {
@@ -1265,7 +1264,7 @@ public class HLog implements Syncable {
           LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
                    " still proceeding ahead...");
       }
-      
+
       try {
         if (isUnderReplication || (this.writer.getLength() > this.logrollsize)) {
           requestLogRoll();
@@ -1275,7 +1274,7 @@ public class HLog implements Syncable {
       }
     }
   }
-  
+
   /**
    * This method gets the datanode replication count for the current HLog.
    *
@@ -1298,7 +1297,7 @@ public class HLog implements Syncable {
   boolean canGetCurReplicas() {
     return this.getNumCurrentReplicas != null;
   }
-  
+
   private void requestLogRoll() {
     if (this.listener != null) {
       this.listener.logRollRequested();
@@ -1309,7 +1308,7 @@ public class HLog implements Syncable {
   throws IOException {
     this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
   }
-  
+
   /** @return How many items have been added to the log */
   int getNumEntries() {
     return numEntries.get();
@@ -1458,7 +1457,7 @@ public class HLog implements Syncable {
           files = NO_FILES;
         }
         for(FileStatus file : files) {
-          Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+          Path newPath = getHLogArchivePath(oldLogDir, file.getPath(), fs, conf);
           LOG.info("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
                      FSUtils.getPath(newPath));
           fs.rename(file.getPath(), newPath);
@@ -1743,8 +1742,29 @@ public class HLog implements Syncable {
     return pattern.matcher(filename).matches();
   }
 
-  static Path getHLogArchivePath(Path oldLogDir, Path p) {
-    return new Path(oldLogDir, p.getName());
+  static Path getHLogArchivePath(Path oldLogDir, Path p, FileSystem fs,
+                                 Configuration conf) throws IOException {
+    String filename = p.getName();
+
+    // if subdirectories are disabled...
+    if (conf != null && !conf.getBoolean(
+        HConstants.HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE,
+        HConstants.HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT)) {
+      return new Path(oldLogDir, filename);
+    }
+
+    if (!validateHLogFilename(filename)) {
+      LOG.warn("Malformed Log file name: " + filename);
+      return new Path(oldLogDir, filename);
+    }
+
+    // since the filename is a valid name, we know there
+    // is a last '.' (won't return -1)
+    String subDirectoryName = filename.substring(0, filename.lastIndexOf('.'));
+    Path oldLogsSubDir = new Path(oldLogDir, subDirectoryName);
+    fs.mkdirs(oldLogsSubDir);
+
+    return new Path(oldLogsSubDir, filename);
   }
 
   /**
@@ -1979,7 +1999,7 @@ public class HLog implements Syncable {
     }
 
     for (Path p: processedLogs) {
-      Path newPath = getHLogArchivePath(oldLogDir, p);
+      Path newPath = getHLogArchivePath(oldLogDir, p, fs, conf);
       if (fs.exists(p)) {
         if (!fs.rename(p, newPath)) {
           LOG.warn("Unable to move  " + p + " to " + newPath);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Dec 17 19:19:43 2012
@@ -82,7 +82,7 @@ public class TestHLogSplit {
   private static final int NUM_WRITERS = 10;
   private static final int ENTRIES = 10; // entries per writer per region
   private static final int NUM_CLOSE_THREADS = 10;
-  
+
   private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
   private long seq = 0;
   private static final byte[] TABLE_NAME = "t1".getBytes();
@@ -174,6 +174,7 @@ public class TestHLogSplit {
     HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+    originalLog = (fs.listStatus(originalLog))[0].getPath();
     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
 
     assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog));
@@ -359,6 +360,7 @@ public class TestHLogSplit {
     HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
 
     FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
+    archivedLogs = fs.listStatus(archivedLogs[0].getPath());
 
     assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
   }
@@ -730,6 +732,8 @@ public class TestHLogSplit {
         logfile.getPath().toString(), conf);
 
     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+    originalLog = (fs.listStatus(originalLog))[0].getPath();
+
     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
 
     assertEquals(true, logsAreEqual(originalLog, splitLog));

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java Mon Dec 17 19:19:43 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTru
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -151,8 +152,12 @@ public class TestMutationWriteToWAL exte
     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
     int nLogFilesRead = 0;
     List<String> actualLogEntries = new ArrayList<String>();
-    for (FileStatus logFile : fs.listStatus(oldLogDir)) {
+    ArrayDeque<FileStatus> checkQueue = new ArrayDeque<FileStatus>(
+      java.util.Arrays.asList(fs.listStatus(oldLogDir)));
+    while (!checkQueue.isEmpty()) {
+      FileStatus logFile = checkQueue.pop();
       if (logFile.isDir()) {
+        checkQueue.addAll(java.util.Arrays.asList(fs.listStatus(logFile.getPath())));
         continue;
       }
       HLog.Reader r = HLog.getReader(fs, logFile.getPath(), conf);