You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/12 20:02:43 UTC
[2/2] hbase git commit: Fix archiving of pv2 WAL files
Fix archiving of pv2 WAL files
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61c90473
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61c90473
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61c90473
Branch: refs/heads/HBASE-14614
Commit: 61c9047329234f6cefcc53edbd807d21b3aea740
Parents: 3d9d69b
Author: Michael Stack <st...@apache.org>
Authored: Fri May 12 13:02:32 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Fri May 12 13:02:32 2017 -0700
----------------------------------------------------------------------
.../procedure2/store/wal/ProcedureWALFile.java | 28 ++++++++++----------
.../procedure2/store/wal/WALProcedureStore.java | 28 +++++++++++++-------
.../org/apache/hadoop/hbase/master/HMaster.java | 4 ++-
.../assignment/TestAssignmentManager.java | 2 +-
4 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 2221cfc..42abe8f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
@@ -156,22 +155,23 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
this.logSize += size;
}
- public void removeFile() throws IOException {
+ public void removeFile(final Path walArchiveDir) throws IOException {
close();
- // TODO: FIX THIS. MAKE THIS ARCHIVE FORMAL.
- Path archiveDir =
- new Path(logFile.getParent().getParent(), HConstants.HFILE_ARCHIVE_DIRECTORY);
- try {
- fs.mkdirs(archiveDir);
- } catch (IOException ioe) {
- LOG.warn("Making " + archiveDir, ioe);
+ boolean archived = false;
+ if (walArchiveDir != null) {
+ Path archivedFile = new Path(walArchiveDir, logFile.getName());
+ LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + walArchiveDir);
+ if (!fs.rename(logFile, archivedFile)) {
+ LOG.warn("Failed archive of " + logFile + ", deleting");
+ } else {
+ archived = true;
+ }
}
- Path archivedFile = new Path(archiveDir, logFile.getName());
- LOG.info("ARCHIVED WAL (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile);
- if (!fs.rename(logFile, archivedFile)) {
- LOG.warn("Failed archive of " + logFile);
+ if (!archived) {
+ if (!fs.delete(logFile, false)) {
+ LOG.warn("Failed delete of " + logFile);
+ }
}
- // fs.delete(logFile, false);
}
public void setProcIds(long minId, long maxId) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 300e023..df818fe 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -124,6 +124,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final Configuration conf;
private final FileSystem fs;
private final Path walDir;
+ private final Path walArchiveDir;
private final AtomicReference<Throwable> syncException = new AtomicReference<>();
private final AtomicBoolean loading = new AtomicBoolean(true);
@@ -185,9 +186,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
final LeaseRecovery leaseRecovery) {
+ this(conf, fs, walDir, null, leaseRecovery);
+ }
+
+ public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
+ final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
this.fs = fs;
this.conf = conf;
this.walDir = walDir;
+ this.walArchiveDir = walArchiveDir;
this.leaseRecovery = leaseRecovery;
}
@@ -343,7 +350,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (LOG.isDebugEnabled()) {
LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
}
- logs.getLast().removeFile();
+ logs.getLast().removeFile(this.walArchiveDir);
continue;
}
@@ -955,7 +962,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// but we should check if someone else has created new files
if (getMaxLogId(getLogFiles()) > flushLogId) {
LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId);
- logs.getLast().removeFile();
+ logs.getLast().removeFile(this.walArchiveDir);
return false;
}
@@ -1047,7 +1054,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
// We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'.
// once there is nothing olding the oldest WAL we can remove it.
while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) {
- removeLogFile(logs.getFirst());
+ removeLogFile(logs.getFirst(), walArchiveDir);
buildHoldingCleanupTracker();
}
@@ -1089,7 +1096,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (lastLogId < log.getLogId()) {
break;
}
- removeLogFile(log);
+ removeLogFile(log, walArchiveDir);
removed = true;
}
@@ -1098,12 +1105,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
- private boolean removeLogFile(final ProcedureWALFile log) {
+ private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Removing log=" + log);
}
- log.removeFile();
+ log.removeFile(walArchiveDir);
logs.remove(log);
if (LOG.isDebugEnabled()) {
LOG.info("Removed log=" + log + " activeLogs=" + logs);
@@ -1192,7 +1199,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName()));
- ProcedureWALFile log = initOldLog(logFiles[i]);
+ ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir);
if (log != null) {
this.logs.add(log);
}
@@ -1222,11 +1229,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
/**
* Loads given log file and it's tracker.
*/
- private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
+ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir)
+ throws IOException {
final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log: " + logFile);
- log.removeFile();
+ log.removeFile(walArchiveDir);
return null;
}
if (LOG.isDebugEnabled()) {
@@ -1236,7 +1244,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
log.open();
} catch (ProcedureWALFormat.InvalidWALDataException e) {
LOG.warn("Remove uninitialized log: " + logFile, e);
- log.removeFile();
+ log.removeFile(walArchiveDir);
return null;
} catch (IOException e) {
String msg = "Unable to read state log: " + logFile;
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0a2d1ba..5c6fdda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1111,8 +1111,10 @@ public class HMaster extends HRegionServer implements MasterServices {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
+ final Path archiveWalDir = new Path(new Path(FSUtils.getWALRootDir(this.conf),
+ HConstants.HFILE_ARCHIVE_DIRECTORY), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
- procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
+ procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, archiveWalDir,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c90473/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 9afb63f..dda41e0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -121,7 +121,7 @@ public class TestAssignmentManager {
conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
- conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 5);
+ conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
}
@Before