You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/12 20:02:43 UTC

[2/2] hbase git commit: Fix archiving of pv2 WAL files

Fix archiving of pv2 WAL files


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61c90473
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61c90473
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61c90473

Branch: refs/heads/HBASE-14614
Commit: 61c9047329234f6cefcc53edbd807d21b3aea740
Parents: 3d9d69b
Author: Michael Stack <st...@apache.org>
Authored: Fri May 12 13:02:32 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Fri May 12 13:02:32 2017 -0700

----------------------------------------------------------------------
 .../procedure2/store/wal/ProcedureWALFile.java  | 28 ++++++++++----------
 .../procedure2/store/wal/WALProcedureStore.java | 28 +++++++++++++-------
 .../org/apache/hadoop/hbase/master/HMaster.java |  4 ++-
 .../assignment/TestAssignmentManager.java       |  2 +-
 4 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 2221cfc..42abe8f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
@@ -156,22 +155,23 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
     this.logSize += size;
   }
 
-  public void removeFile() throws IOException {
+  public void removeFile(final Path walArchiveDir) throws IOException {
     close();
-    // TODO: FIX THIS. MAKE THIS ARCHIVE FORMAL.
-    Path archiveDir =
-        new Path(logFile.getParent().getParent(), HConstants.HFILE_ARCHIVE_DIRECTORY);
-    try {
-      fs.mkdirs(archiveDir);
-    } catch (IOException ioe) {
-      LOG.warn("Making " + archiveDir, ioe);
+    boolean archived = false;
+    if (walArchiveDir != null) {
+      Path archivedFile = new Path(walArchiveDir, logFile.getName());
+      LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + walArchiveDir);
+      if (!fs.rename(logFile, archivedFile)) {
+        LOG.warn("Failed archive of " + logFile + ", deleting");
+      } else {
+        archived = true;
+      }
     }
-    Path archivedFile = new Path(archiveDir, logFile.getName());
-    LOG.info("ARCHIVED WAL (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile);
-    if (!fs.rename(logFile, archivedFile)) {
-      LOG.warn("Failed archive of " + logFile);
+    if (!archived) {
+      if (!fs.delete(logFile, false)) {
+        LOG.warn("Failed delete of " + logFile);
+      }
     }
-    // fs.delete(logFile, false);
   }
 
   public void setProcIds(long minId, long maxId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 300e023..df818fe 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -124,6 +124,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
   private final Configuration conf;
   private final FileSystem fs;
   private final Path walDir;
+  private final Path walArchiveDir;
 
   private final AtomicReference<Throwable> syncException = new AtomicReference<>();
   private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -185,9 +186,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
   public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
       final LeaseRecovery leaseRecovery) {
+    this(conf, fs, walDir, null, leaseRecovery);
+  }
+
+  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
+      final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
     this.fs = fs;
     this.conf = conf;
     this.walDir = walDir;
+    this.walArchiveDir = walArchiveDir;
     this.leaseRecovery = leaseRecovery;
   }
 
@@ -343,7 +350,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
           }
-          logs.getLast().removeFile();
+          logs.getLast().removeFile(this.walArchiveDir);
           continue;
         }
 
@@ -955,7 +962,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // but we should check if someone else has created new files
     if (getMaxLogId(getLogFiles()) > flushLogId) {
       LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
-      logs.getLast().removeFile();
+      logs.getLast().removeFile(this.walArchiveDir);
       return false;
     }
 
@@ -1047,7 +1054,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
     // once there is nothing olding the oldest WAL we can remove it.
     while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
-      removeLogFile(logs.getFirst());
+      removeLogFile(logs.getFirst(), walArchiveDir);
       buildHoldingCleanupTracker();
     }
 
@@ -1089,7 +1096,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       if (lastLogId < log.getLogId()) {
         break;
       }
-      removeLogFile(log);
+      removeLogFile(log, walArchiveDir);
       removed = true;
     }
 
@@ -1098,12 +1105,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  private boolean removeLogFile(final ProcedureWALFile log) {
+  private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
     try {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing log=" + log);
       }
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       logs.remove(log);
       if (LOG.isDebugEnabled()) {
         LOG.info("Removed log=" + log + " activeLogs=" + logs);
@@ -1192,7 +1199,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
         }
 
         maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
-        ProcedureWALFile log = initOldLog(logFiles[i]);
+        ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
         if (log != null) {
           this.logs.add(log);
         }
@@ -1222,11 +1229,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
   /**
    * Loads given log file and it's tracker.
    */
-  private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+  private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
+  throws IOException {
     final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
       LOG.warn("Remove uninitialized log: " + logFile);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     }
     if (LOG.isDebugEnabled()) {
@@ -1236,7 +1244,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       log.open();
     } catch (ProcedureWALFormat.InvalidWALDataException e) {
       LOG.warn("Remove uninitialized log: " + logFile, e);
-      log.removeFile();
+      log.removeFile(walArchiveDir);
       return null;
     } catch (IOException e) {
       String msg = "Unable to read state log: " + logFile;

http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0a2d1ba..5c6fdda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1111,8 +1111,10 @@ public class HMaster extends HRegionServer implements MasterServices {
     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
     final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
+    final Path archiveWalDir = new Path(new Path(FSUtils.getWALRootDir(this.conf),
+        HConstants.HFILE_ARCHIVE_DIRECTORY), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
 
-    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
+    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, archiveWalDir,
         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
     MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 9afb63f..dda41e0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -121,7 +121,7 @@ public class TestAssignmentManager {
     conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
     conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
     conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
-    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 5);
+    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
   }
 
   @Before