You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/06 04:51:30 UTC
svn commit: r1227950 - in /hbase/branches/0.92: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/main/java/org/apache/hadoop/hbase/zookeeper/
src/test/java/org/apache/hadoop/hbase/master/
Author: tedyu
Date: Fri Jan 6 03:51:29 2012
New Revision: 1227950
URL: http://svn.apache.org/viewvc?rev=1227950&view=rev
Log:
HBASE-5081 Distributed log splitting deleteNode races against splitLog retry (Prakash)
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Fri Jan 6 03:51:29 2012
@@ -506,6 +506,7 @@ Release 0.92.0 - Unreleased
shutdown handler for the region sever the root region was on (Jimmy)
HBASE-5094 The META can hold an entry for a region with a different server name from
the one actually in the AssignmentManager thus making the region inaccessible.(Ram)
+ HBASE-5081 Distributed log splitting deleteNode races against splitLog retry (Prakash)
TESTS
HBASE-4492 TestRollingRestart fails intermittently
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Fri Jan 6 03:51:29 2012
@@ -272,12 +272,7 @@ public class MasterFileSystem {
splitLogManager.handleDeadWorker(serverName.toString());
}
splitTime = EnvironmentEdgeManager.currentTimeMillis();
- try {
- splitLogSize = splitLogManager.splitLogDistributed(logDirs);
- } catch (OrphanHLogAfterSplitException e) {
- LOG.warn("Retrying distributed splitting for " + serverNames, e);
- splitLogManager.splitLogDistributed(logDirs);
- }
+ splitLogSize = splitLogManager.splitLogDistributed(logDirs);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
} else {
for(Path logDir: logDirs){
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Fri Jan 6 03:51:29 2012
@@ -106,6 +106,7 @@ public class SplitLogManager extends Zoo
private long timeout;
private long unassignedTimeout;
private long lastNodeCreateTime = Long.MAX_VALUE;
+ public boolean ignoreZKDeleteForTesting = false;
private ConcurrentMap<String, Task> tasks =
new ConcurrentHashMap<String, Task>();
@@ -115,10 +116,12 @@ public class SplitLogManager extends Zoo
private Object deadWorkersLock = new Object();
/**
- * Its OK to construct this object even when region-servers are not online. It
- * does lookup the orphan tasks in zk but it doesn't block for them to be
- * done.
- *
+ * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
+ * Stoppable, String, TaskFinisher)} that provides a task finisher for
+ * copying recovered edits to their final destination. The task finisher
+ * has to be robust because it can be arbitrarily restarted or called
+ * multiple times.
+ *
* @param zkw
* @param conf
* @param stopper
@@ -141,6 +144,18 @@ public class SplitLogManager extends Zoo
}
});
}
+
+ /**
+ * Its OK to construct this object even when region-servers are not online. It
+ * does lookup the orphan tasks in zk but it doesn't block waiting for them
+ * to be done.
+ *
+ * @param zkw
+ * @param conf
+ * @param stopper
+ * @param serverName
+ * @param tf task finisher
+ */
public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
Stoppable stopper, String serverName, TaskFinisher tf) {
super(zkw);
@@ -193,8 +208,6 @@ public class SplitLogManager extends Zoo
fileStatus.add(status);
}
}
- if (fileStatus.isEmpty())
- return null;
FileStatus[] a = new FileStatus[fileStatus.size()];
return fileStatus.toArray(a);
}
@@ -227,8 +240,6 @@ public class SplitLogManager extends Zoo
MonitoredTask status = TaskMonitor.get().createStatus(
"Doing distributed log split in " + logDirs);
FileStatus[] logfiles = getFileList(logDirs);
- if(logfiles == null)
- return 0;
status.setStatus("Checking directory contents...");
LOG.debug("Scheduling batch of logs to split");
tot_mgr_log_split_batch_start.incrementAndGet();
@@ -250,7 +261,7 @@ public class SplitLogManager extends Zoo
}
waitTasks(batch, status);
if (batch.done != batch.installed) {
- stopTrackingTasks(batch);
+ batch.isDead = true;
tot_mgr_log_split_batch_err.incrementAndGet();
LOG.warn("error while splitting logs in " + logDirs +
" installed = " + batch.installed + " but only " + batch.done + " done");
@@ -258,17 +269,21 @@ public class SplitLogManager extends Zoo
+ logDirs + " Task = " + batch);
}
for(Path logDir: logDirs){
- if (anyNewLogFiles(logDir, logfiles)) {
- tot_mgr_new_unexpected_hlogs.incrementAndGet();
- LOG.warn("new hlogs were produced while logs in " + logDir +
- " were being split");
- throw new OrphanHLogAfterSplitException();
- }
- tot_mgr_log_split_batch_success.incrementAndGet();
status.setStatus("Cleaning up log directory...");
- if (!fs.delete(logDir, true)) {
- throw new IOException("Unable to delete src dir: " + logDir);
+ try {
+ if (fs.exists(logDir) && !fs.delete(logDir, false)) {
+ LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
+ }
+ } catch (IOException ioe) {
+ FileStatus[] files = fs.listStatus(logDir);
+ if (files != null && files.length > 0) {
+ LOG.warn("returning success without actually splitting and " +
+ "deleting all the log files in path " + logDir);
+ } else {
+ LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
+ }
}
+ tot_mgr_log_split_batch_success.incrementAndGet();
}
String msg = "finished splitting (more than or equal to) " + totalSize +
" bytes in " + batch.installed + " log files in " + logDirs + " in " +
@@ -287,8 +302,6 @@ public class SplitLogManager extends Zoo
createNode(path, zkretries);
return true;
}
- LOG.warn(path + "is already being split. " +
- "Two threads cannot wait for the same task");
return false;
}
@@ -315,15 +328,6 @@ public class SplitLogManager extends Zoo
}
private void setDone(String path, TerminationStatus status) {
- if (!ZKSplitLog.isRescanNode(watcher, path)) {
- if (status == SUCCESS) {
- tot_mgr_log_split_success.incrementAndGet();
- LOG.info("Done splitting " + path);
- } else {
- tot_mgr_log_split_err.incrementAndGet();
- LOG.warn("Error splitting " + path);
- }
- }
Task task = tasks.get(path);
if (task == null) {
if (!ZKSplitLog.isRescanNode(watcher, path)) {
@@ -332,18 +336,24 @@ public class SplitLogManager extends Zoo
}
} else {
synchronized (task) {
- task.deleted = true;
- // if in stopTrackingTasks() we were to make tasks orphan instead of
- // forgetting about them then we will have to handle the race when
- // accessing task.batch here.
- if (!task.isOrphan()) {
- synchronized (task.batch) {
- if (status == SUCCESS) {
- task.batch.done++;
- } else {
- task.batch.error++;
+ if (task.status == IN_PROGRESS) {
+ if (status == SUCCESS) {
+ tot_mgr_log_split_success.incrementAndGet();
+ LOG.info("Done splitting " + path);
+ } else {
+ tot_mgr_log_split_err.incrementAndGet();
+ LOG.warn("Error splitting " + path);
+ }
+ task.status = status;
+ if (task.batch != null) {
+ synchronized (task.batch) {
+ if (status == SUCCESS) {
+ task.batch.done++;
+ } else {
+ task.batch.error++;
+ }
+ task.batch.notify();
}
- task.batch.notify();
}
}
}
@@ -386,6 +396,11 @@ public class SplitLogManager extends Zoo
private void getDataSetWatchSuccess(String path, byte[] data, int version) {
if (data == null) {
+ if (version == Integer.MIN_VALUE) {
+ // assume all done. The task znode suddenly disappeared.
+ setDone(path, SUCCESS);
+ return;
+ }
tot_mgr_null_data.incrementAndGet();
LOG.fatal("logic error - got null data " + path);
setDone(path, FAILURE);
@@ -472,7 +487,7 @@ public class SplitLogManager extends Zoo
ResubmitDirective directive) {
// its ok if this thread misses the update to task.deleted. It will
// fail later
- if (task.deleted) {
+ if (task.status != IN_PROGRESS) {
return false;
}
int version;
@@ -482,7 +497,8 @@ public class SplitLogManager extends Zoo
return false;
}
if (task.unforcedResubmits >= resubmit_threshold) {
- if (task.unforcedResubmits == resubmit_threshold) {
+ if (!task.resubmitThresholdReached) {
+ task.resubmitThresholdReached = true;
tot_mgr_resubmit_threshold_reached.incrementAndGet();
LOG.info("Skipping resubmissions of task " + path +
" because threshold " + resubmit_threshold + " reached");
@@ -506,7 +522,9 @@ public class SplitLogManager extends Zoo
return false;
}
} catch (NoNodeException e) {
- LOG.debug("failed to resubmit " + path + " task done");
+ LOG.warn("failed to resubmit because znode doesn't exist " + path +
+ " task done (or forced done by removing the znode)");
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
return false;
} catch (KeeperException e) {
tot_mgr_resubmit_failed.incrementAndGet();
@@ -531,12 +549,18 @@ public class SplitLogManager extends Zoo
private void deleteNode(String path, Long retries) {
tot_mgr_node_delete_queued.incrementAndGet();
+ // Once a task znode is ready for delete, that is it is in the TASK_DONE
+ // state, then no one should be writing to it anymore. That is no one
+ // will be updating the znode version any more.
this.watcher.getRecoverableZooKeeper().getZooKeeper().
delete(path, -1, new DeleteAsyncCallback(),
retries);
}
private void deleteNodeSuccess(String path) {
+ if (ignoreZKDeleteForTesting) {
+ return;
+ }
Task task;
task = tasks.remove(path);
if (task == null) {
@@ -547,6 +571,10 @@ public class SplitLogManager extends Zoo
LOG.debug("deleted task without in memory state " + path);
return;
}
+ synchronized (task) {
+ task.status = DELETED;
+ task.notify();
+ }
tot_mgr_task_deleted.incrementAndGet();
}
@@ -595,59 +623,67 @@ public class SplitLogManager extends Zoo
Task oldtask;
// batch.installed is only changed via this function and
// a single thread touches batch.installed.
- oldtask = tasks.putIfAbsent(path, new Task(batch));
- if (oldtask != null) {
- // new task was not used.
- batch.installed--;
- synchronized (oldtask) {
- if (oldtask.isOrphan()) {
- if (oldtask.deleted) {
- // The task is already done. Do not install the batch for this
- // task because it might be too late for setDone() to update
- // batch.done. There is no need for the batch creator to wait for
- // this task to complete.
- return (null);
+ Task newtask = new Task();
+ newtask.batch = batch;
+ oldtask = tasks.putIfAbsent(path, newtask);
+ if (oldtask == null) {
+ batch.installed++;
+ return null;
+ }
+ // new task was not used.
+ synchronized (oldtask) {
+ if (oldtask.isOrphan()) {
+ if (oldtask.status == SUCCESS) {
+ // The task is already done. Do not install the batch for this
+ // task because it might be too late for setDone() to update
+ // batch.done. There is no need for the batch creator to wait for
+ // this task to complete.
+ return (null);
+ }
+ if (oldtask.status == IN_PROGRESS) {
+ oldtask.batch = batch;
+ batch.installed++;
+ LOG.debug("Previously orphan task " + path +
+ " is now being waited upon");
+ return null;
+ }
+ while (oldtask.status == FAILURE) {
+ LOG.debug("wait for status of task " + path +
+ " to change to DELETED");
+ tot_mgr_wait_for_zk_delete.incrementAndGet();
+ try {
+ oldtask.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted when waiting for znode delete callback");
+ // fall through to return failure
+ break;
}
- oldtask.setBatch(batch);
}
- }
- LOG.info("Previously orphan task " + path +
- " is now being waited upon");
- return (null);
- }
- return oldtask;
- }
-
- /**
- * This function removes any knowledge of this batch's tasks from the
- * manager. It doesn't actually stop the active tasks. If the tasks are
- * resubmitted then the active tasks will be reacquired and monitored by the
- * manager. It is important to call this function when batch processing
- * terminates prematurely, otherwise if the tasks are re-submitted
- * then they might fail.
- * <p>
- * there is a slight race here. even after a task has been removed from
- * {@link #tasks} someone who had acquired a reference to it will continue to
- * process the task. That is OK since we don't actually change the task and
- * the batch objects.
- * <p>
- * TODO Its probably better to convert these to orphan tasks but then we
- * have to deal with race conditions as we nullify Task's batch pointer etc.
- * <p>
- * @param batch
- */
- void stopTrackingTasks(TaskBatch batch) {
- for (Map.Entry<String, Task> e : tasks.entrySet()) {
- String path = e.getKey();
- Task t = e.getValue();
- if (t.batch == batch) { // == is correct. equals not necessary.
- tasks.remove(path);
- }
+ if (oldtask.status != DELETED) {
+ LOG.warn("Failure because previously failed task" +
+ " state still present. Waiting for znode delete callback" +
+ " path=" + path);
+ return oldtask;
+ }
+ // reinsert the newTask and it must succeed this time
+ Task t = tasks.putIfAbsent(path, newtask);
+ if (t == null) {
+ batch.installed++;
+ return null;
+ }
+ LOG.fatal("Logic error. Deleted task still present in tasks map");
+ assert false : "Deleted task still present in tasks map";
+ return t;
+ }
+ LOG.warn("Failure because two threads can't wait for the same task. " +
+ " path=" + path);
+ return oldtask;
}
}
Task findOrCreateOrphanTask(String path) {
- Task orphanTask = new Task(null);
+ Task orphanTask = new Task();
Task task;
task = tasks.putIfAbsent(path, orphanTask);
if (task == null) {
@@ -708,9 +744,10 @@ public class SplitLogManager extends Zoo
* All access is synchronized.
*/
static class TaskBatch {
- int installed;
- int done;
- int error;
+ int installed = 0;
+ int done = 0;
+ int error = 0;
+ volatile boolean isDead = false;
@Override
public String toString() {
@@ -723,45 +760,35 @@ public class SplitLogManager extends Zoo
* in memory state of an active task.
*/
static class Task {
- long last_update;
- int last_version;
- String cur_worker_name;
+ volatile long last_update;
+ volatile int last_version;
+ volatile String cur_worker_name;
TaskBatch batch;
- boolean deleted;
- int incarnation;
- int unforcedResubmits;
+ volatile TerminationStatus status;
+ volatile int incarnation;
+ volatile int unforcedResubmits;
+ volatile boolean resubmitThresholdReached;
@Override
public String toString() {
return ("last_update = " + last_update +
" last_version = " + last_version +
" cur_worker_name = " + cur_worker_name +
- " deleted = " + deleted +
+ " status = " + status +
" incarnation = " + incarnation +
" resubmits = " + unforcedResubmits +
" batch = " + batch);
}
- Task(TaskBatch tb) {
+ Task() {
incarnation = 0;
last_version = -1;
- deleted = false;
- setBatch(tb);
+ status = IN_PROGRESS;
setUnassigned();
}
- public void setBatch(TaskBatch batch) {
- if (batch != null && this.batch != null) {
- LOG.fatal("logic error - batch being overwritten");
- }
- this.batch = batch;
- if (batch != null) {
- batch.installed++;
- }
- }
-
public boolean isOrphan() {
- return (batch == null);
+ return (batch == null || batch.isDead);
}
public boolean isUnassigned() {
@@ -860,6 +887,16 @@ public class SplitLogManager extends Zoo
if (tot > 0 && !found_assigned_task &&
((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
unassignedTimeout)) {
+ for (Map.Entry<String, Task> e : tasks.entrySet()) {
+ String path = e.getKey();
+ Task task = e.getValue();
+ // we have to do this check again because tasks might have
+ // been asynchronously assigned.
+ if (task.isUnassigned()) {
+ // We just touch the znode to make sure its still there
+ getDataSetWatch(path, zkretries);
+ }
+ }
createRescanNode(Long.MAX_VALUE);
tot_mgr_resubmit_unassigned.incrementAndGet();
LOG.debug("resubmitting unassigned task(s) after timeout");
@@ -879,6 +916,12 @@ public class SplitLogManager extends Zoo
tot_mgr_node_create_result.incrementAndGet();
if (rc != 0) {
if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+ // What if there is a delete pending against this pre-existing
+ // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
+ // state. Only operations that will be carried out on this node by
+ // this manager are get-znode-data, task-finisher and delete-znode.
+ // And all code pieces correctly handle the case of suddenly
+ // disappearing task-znode.
LOG.debug("found pre-existing znode " + path);
tot_mgr_node_already_exists.incrementAndGet();
} else {
@@ -911,6 +954,15 @@ public class SplitLogManager extends Zoo
Stat stat) {
tot_mgr_get_data_result.incrementAndGet();
if (rc != 0) {
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ tot_mgr_get_data_nonode.incrementAndGet();
+ // The task znode has been deleted. Must be some pending delete
+ // that deleted the task. Assume success because a task-znode is
+ // is only deleted after TaskFinisher is successful.
+ LOG.warn("task znode " + path + " vanished.");
+ getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+ return;
+ }
Long retry_count = (Long) ctx;
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
path + " retry=" + retry_count);
@@ -952,9 +1004,10 @@ public class SplitLogManager extends Zoo
}
return;
} else {
- LOG.debug(path
- + " does not exist, either was never created or was deleted"
- + " in earlier rounds, zkretries = " + (Long) ctx);
+ LOG.debug(path +
+ " does not exist. Either was created but deleted behind our" +
+ " back by another pending delete OR was deleted" +
+ " in earlier retry rounds. zkretries = " + (Long) ctx);
}
} else {
LOG.debug("deleted " + path);
@@ -992,46 +1045,10 @@ public class SplitLogManager extends Zoo
}
/**
- * checks whether any new files have appeared in logDir which were
- * not present in the original logfiles set
- * @param logdir
- * @param logfiles
- * @return True if a new log file is found
- * @throws IOException
- */
- public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles)
- throws IOException {
- if (logdir == null) {
- return false;
- }
- LOG.debug("re-listing " + logdir);
- tot_mgr_relist_logdir.incrementAndGet();
- FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null);
- if (newfiles == null) {
- return false;
- }
- boolean matched;
- for (FileStatus newfile : newfiles) {
- matched = false;
- for (FileStatus origfile : logfiles) {
- if (origfile.equals(newfile)) {
- matched = true;
- break;
- }
- }
- if (matched == false) {
- LOG.warn("Discovered orphan hlog " + newfile + " after split." +
- " Maybe HRegionServer was not dead when we started");
- return true;
- }
- }
- return false;
- }
-
- /**
* {@link SplitLogManager} can use objects implementing this interface to
* finish off a partially done task by {@link SplitLogWorker}. This provides
- * a serialization point at the end of the task processing.
+ * a serialization point at the end of the task processing. Must be
+ * restartable and idempotent.
*/
static public interface TaskFinisher {
/**
@@ -1063,7 +1080,19 @@ public class SplitLogManager extends Zoo
FORCE();
}
enum TerminationStatus {
- SUCCESS(),
- FAILURE();
+ IN_PROGRESS("in_progress"),
+ SUCCESS("success"),
+ FAILURE("failure"),
+ DELETED("deleted");
+
+ String statusMsg;
+ TerminationStatus(String msg) {
+ statusMsg = msg;
+ }
+
+ @Override
+ public String toString() {
+ return statusMsg;
+ }
}
}
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Fri Jan 6 03:51:29 2012
@@ -541,16 +541,21 @@ public class HLogSplitter {
if (ZKSplitLog.isCorruptFlagFile(dst)) {
continue;
}
- if (fs.exists(dst)) {
- fs.delete(dst, false);
- } else {
- Path dstdir = dst.getParent();
- if (!fs.exists(dstdir)) {
- if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+ if (fs.exists(src)) {
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ } else {
+ Path dstdir = dst.getParent();
+ if (!fs.exists(dstdir)) {
+ if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+ }
}
+ fs.rename(src, dst);
+ LOG.debug(" moved " + src + " => " + dst);
+ } else {
+ LOG.debug("Could not move recovered edits from " + src +
+ " as it doesn't exist");
}
- fs.rename(src, dst);
- LOG.debug(" moved " + src + " => " + dst);
}
archiveLogs(null, corruptedLogs, processedLogs,
oldLogDir, fs, conf);
@@ -600,24 +605,32 @@ public class HLogSplitter {
}
fs.mkdirs(oldLogDir);
+ // this method can get restarted or called multiple times for archiving
+ // the same log files.
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
- if (!fs.rename(corrupted, p)) {
- LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
- } else {
- LOG.info("Moving corrupted log " + corrupted + " to " + p);
+ if (fs.exists(corrupted)) {
+ if (!fs.rename(corrupted, p)) {
+ LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+ } else {
+ LOG.warn("Moving corrupted log " + corrupted + " to " + p);
+ }
}
}
for (Path p : processedLogs) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
- if (!fs.rename(p, newPath)) {
- LOG.info("Unable to move " + p + " to " + newPath);
- } else {
- LOG.info("Archived processed log " + p + " to " + newPath);
+ if (fs.exists(p)) {
+ if (!fs.rename(p, newPath)) {
+ LOG.warn("Unable to move " + p + " to " + newPath);
+ } else {
+ LOG.debug("Archived processed log " + p + " to " + newPath);
+ }
}
}
+ // distributed log splitting removes the srcDir (region's log dir) later
+ // when all the log files in that srcDir have been successfully processed
if (srcDir != null && !fs.delete(srcDir, true)) {
throw new IOException("Unable to delete src dir: " + srcDir);
}
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Fri Jan 6 03:51:29 2012
@@ -215,6 +215,7 @@ public class ZKSplitLog {
public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+ public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
@@ -224,6 +225,7 @@ public class ZKSplitLog {
public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+ public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
public static AtomicLong tot_mgr_resubmit_threshold_reached =
new AtomicLong(0);
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Fri Jan 6 03:51:29 2012
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -143,51 +144,6 @@ public class TestDistributedLogSplitting
TEST_UTIL.countRows(ht));
}
- @Test(expected=OrphanHLogAfterSplitException.class, timeout=300000)
- public void testOrphanLogCreation() throws Exception {
- LOG.info("testOrphanLogCreation");
- startCluster(NUM_RS);
- final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
- final FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
- List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
- HRegionServer hrs = rsts.get(0).getRegionServer();
- Path rootdir = FSUtils.getRootDir(conf);
- final Path logDir = new Path(rootdir,
- HLog.getHLogDirectoryName(hrs.getServerName().toString()));
-
- installTable(new ZooKeeperWatcher(conf, "table-creation", null),
- "table", "family", 40);
-
- makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
- 1000, 100);
-
- new Thread() {
- public void run() {
- while (true) {
- int i = 0;
- try {
- while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
- 0) {
- Thread.yield();
- }
- fs.createNewFile(new Path(logDir, "foo" + i++));
- } catch (Exception e) {
- LOG.debug("file creation failed", e);
- return;
- }
- }
- }
- }.start();
- slm.splitLogDistributed(logDir);
- FileStatus[] files = fs.listStatus(logDir);
- if (files != null) {
- for (FileStatus file : files) {
- LOG.debug("file still there " + file.getPath());
- }
- }
- }
-
@Test (timeout=300000)
public void testRecoveredEdits() throws Exception {
LOG.info("testRecoveredEdits");
@@ -310,6 +266,45 @@ public class TestDistributedLogSplitting
"tot_wkr_preempt_task");
}
+ @Test
+ public void testDelayedDeleteOnFailure() throws Exception {
+ LOG.info("testDelayedDeleteOnFailure");
+ startCluster(1);
+ final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+ final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+ final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
+ fs.mkdirs(logDir);
+ final Path corruptedLogFile = new Path(logDir, "x");
+ FSDataOutputStream out;
+ out = fs.create(corruptedLogFile);
+ out.write(0);
+ out.write(Bytes.toBytes("corrupted bytes"));
+ out.close();
+ slm.ignoreZKDeleteForTesting = true;
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ slm.splitLogDistributed(logDir);
+ } catch (IOException ioe) {
+ try {
+ assertTrue(fs.exists(corruptedLogFile));
+ slm.splitLogDistributed(logDir);
+ } catch (IOException e) {
+ assertTrue(Thread.currentThread().isInterrupted());
+ return;
+ }
+ fail("did not get the expected IOException from the 2nd call");
+ }
+ fail("did not get the expected IOException from the 1st call");
+ }
+ };
+ t.start();
+ waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
+ t.interrupt();
+ t.join();
+ }
+
HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
int nrs ) throws Exception {
// Create a table with regions
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1227950&r1=1227949&r2=1227950&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Fri Jan 6 03:51:29 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zo
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
@@ -31,6 +32,7 @@ import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Stoppable;
@@ -112,19 +114,34 @@ public class TestSplitLogManager {
TEST_UTIL.shutdownMiniZKCluster();
}
- private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ private interface Expr {
+ public long eval();
+ }
+
+ private void waitForCounter(final AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ Expr e = new Expr() {
+ public long eval() {
+ return ctr.get();
+ }
+ };
+ waitForCounter(e, oldval, newval, timems);
+ return;
+ }
+
+ private void waitForCounter(Expr e, long oldval, long newval,
long timems) {
long curt = System.currentTimeMillis();
long endt = curt + timems;
while (curt < endt) {
- if (ctr.get() == oldval) {
+ if (e.eval() == oldval) {
try {
Thread.sleep(10);
- } catch (InterruptedException e) {
+ } catch (InterruptedException eintr) {
}
curt = System.currentTimeMillis();
} else {
- assertEquals(newval, ctr.get());
+ assertEquals(newval, e.eval());
return;
}
}
@@ -267,10 +284,8 @@ public class TestSplitLogManager {
public void testRescanCleanup() throws Exception {
LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
- int to = 1000;
- conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeout", 1000);
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
- to = to + 2 * 100;
slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
slm.finishInitialization();
TaskBatch batch = new TaskBatch();
@@ -280,14 +295,23 @@ public class TestSplitLogManager {
ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
- waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
- int version1 = ZKUtil.checkExists(zkw, tasknode);
- assertTrue(version1 > version);
- byte[] taskstate = ZKUtil.getData(zkw, tasknode);
- assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
- taskstate));
-
- waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+ waitForCounter(new Expr() {
+ @Override
+ public long eval() {
+ return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
+ }
+ }, 0, 1, 5*60000); // wait long enough
+ if (tot_mgr_resubmit_failed.get() == 0) {
+ int version1 = ZKUtil.checkExists(zkw, tasknode);
+ assertTrue(version1 > version);
+ byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+ assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+ taskstate));
+
+ waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+ } else {
+ LOG.warn("Could not run test. Lost ZK connection?");
+ }
return;
}
@@ -418,4 +442,50 @@ public class TestSplitLogManager {
taskstate));
return;
}
+
+ @Test
+ public void testEmptyLogDir() throws Exception {
+ LOG.info("testEmptyLogDir");
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
+ UUID.randomUUID().toString());
+ fs.mkdirs(emptyLogDirPath);
+ slm.splitLogDistributed(emptyLogDirPath);
+ assertFalse(fs.exists(emptyLogDirPath));
+ }
+
+ @Test
+ public void testVanishingTaskZNode() throws Exception {
+ LOG.info("testVanishingTaskZNode");
+ conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ final Path logDir = new Path(fs.getWorkingDirectory(),
+ UUID.randomUUID().toString());
+ fs.mkdirs(logDir);
+ Path logFile = new Path(logDir, UUID.randomUUID().toString());
+ fs.createNewFile(logFile);
+ new Thread() {
+ public void run() {
+ try {
+ // this call will block because there are no SplitLogWorkers
+ slm.splitLogDistributed(logDir);
+ } catch (Exception e) {
+ LOG.warn("splitLogDistributed failed", e);
+ fail();
+ }
+ }
+ }.start();
+ waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
+ String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
+ // remove the task znode
+ ZKUtil.deleteNode(zkw, znode);
+ waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
+ waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
+ assertTrue(fs.exists(logFile));
+ fs.delete(logDir, true);
+ }
}