You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/06 04:53:58 UTC

svn commit: r1227951 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/main/java/org/apache/hadoop/hbase/zookeeper/ src/test/java/org/apache/hadoop/hbase/master/

Author: tedyu
Date: Fri Jan  6 03:53:58 2012
New Revision: 1227951

URL: http://svn.apache.org/viewvc?rev=1227951&view=rev
Log:
HBASE-5081  Distributed log splitting deleteNode races against splitLog retry (Prakash)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jan  6 03:53:58 2012
@@ -470,6 +470,7 @@ Release 0.92.0 - Unreleased
                are shutdown at the same time (Ming Ma)
    HBASE-5094  The META can hold an entry for a region with a different server name from the one  
                actually in the AssignmentManager thus making the region inaccessible. (Ram)
+   HBASE-5081  Distributed log splitting deleteNode races against splitLog retry (Prakash)
 
   TESTS
    HBASE-4450  test for number of blocks read: to serve as baseline for expected

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Fri Jan  6 03:53:58 2012
@@ -270,12 +270,7 @@ public class MasterFileSystem {
     if (distributedLogSplitting) {
       splitLogManager.handleDeadWorkers(serverNames);
       splitTime = EnvironmentEdgeManager.currentTimeMillis();
-      try {
-        splitLogSize = splitLogManager.splitLogDistributed(logDirs);
-      } catch (OrphanHLogAfterSplitException e) {
-        LOG.warn("Retrying distributed splitting for " + serverNames, e);
-        splitLogManager.splitLogDistributed(logDirs);
-      }
+      splitLogSize = splitLogManager.splitLogDistributed(logDirs);
       splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
     } else {
       for(Path logDir: logDirs){

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Fri Jan  6 03:53:58 2012
@@ -107,6 +107,7 @@ public class SplitLogManager extends Zoo
   private long timeout;
   private long unassignedTimeout;
   private long lastNodeCreateTime = Long.MAX_VALUE;
+  public boolean ignoreZKDeleteForTesting = false;
 
   private ConcurrentMap<String, Task> tasks =
     new ConcurrentHashMap<String, Task>();
@@ -116,10 +117,12 @@ public class SplitLogManager extends Zoo
   private Object deadWorkersLock = new Object();
 
   /**
-   * Its OK to construct this object even when region-servers are not online. It
-   * does lookup the orphan tasks in zk but it doesn't block for them to be
-   * done.
-   *
+   * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
+   * Stoppable, String, TaskFinisher)} that provides a task finisher for
+   * copying recovered edits to their final destination. The task finisher
+   * has to be robust because it can be arbitrarily restarted or called
+   * multiple times.
+   * 
    * @param zkw
    * @param conf
    * @param stopper
@@ -142,6 +145,18 @@ public class SplitLogManager extends Zoo
       }
     });
   }
+
+  /**
+   * Its OK to construct this object even when region-servers are not online. It
+   * does lookup the orphan tasks in zk but it doesn't block waiting for them
+   * to be done.
+   *
+   * @param zkw
+   * @param conf
+   * @param stopper
+   * @param serverName
+   * @param tf task finisher 
+   */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
       Stoppable stopper, String serverName, TaskFinisher tf) {
     super(zkw);
@@ -194,8 +209,6 @@ public class SplitLogManager extends Zoo
           fileStatus.add(status);
       }
     }
-    if (fileStatus.isEmpty())
-      return null;
     FileStatus[] a = new FileStatus[fileStatus.size()];
     return fileStatus.toArray(a);
   }
@@ -228,8 +241,6 @@ public class SplitLogManager extends Zoo
     MonitoredTask status = TaskMonitor.get().createStatus(
           "Doing distributed log split in " + logDirs);
     FileStatus[] logfiles = getFileList(logDirs);
-    if(logfiles == null)
-      return 0;
     status.setStatus("Checking directory contents...");
     LOG.debug("Scheduling batch of logs to split");
     tot_mgr_log_split_batch_start.incrementAndGet();
@@ -251,7 +262,7 @@ public class SplitLogManager extends Zoo
     }
     waitForSplittingCompletion(batch, status);
     if (batch.done != batch.installed) {
-      stopTrackingTasks(batch);
+      batch.isDead = true;
       tot_mgr_log_split_batch_err.incrementAndGet();
       LOG.warn("error while splitting logs in " + logDirs +
       " installed = " + batch.installed + " but only " + batch.done + " done");
@@ -259,17 +270,21 @@ public class SplitLogManager extends Zoo
           + logDirs + " Task = " + batch);
     }
     for(Path logDir: logDirs){
-      if (anyNewLogFiles(logDir, logfiles)) {
-        tot_mgr_new_unexpected_hlogs.incrementAndGet();
-        LOG.warn("new hlogs were produced while logs in " + logDir +
-          " were being split");
-        throw new OrphanHLogAfterSplitException();
-      }
-      tot_mgr_log_split_batch_success.incrementAndGet();
       status.setStatus("Cleaning up log directory...");
-      if (!fs.delete(logDir, true)) {
-        throw new IOException("Unable to delete src dir: " + logDir);
+      try {
+        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
+          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
+        }
+      } catch (IOException ioe) {
+        FileStatus[] files = fs.listStatus(logDir);
+        if (files != null && files.length > 0) {
+          LOG.warn("returning success without actually splitting and " + 
+              "deleting all the log files in path " + logDir);
+        } else {
+          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
+        }
       }
+      tot_mgr_log_split_batch_success.incrementAndGet();
     }
     String msg = "finished splitting (more than or equal to) " + totalSize +
         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
@@ -295,8 +310,6 @@ public class SplitLogManager extends Zoo
       createNode(path, zkretries);
       return true;
     }
-    LOG.warn(path + "is already being split. " +
-        "Two threads cannot wait for the same task");
     return false;
   }
 
@@ -323,15 +336,6 @@ public class SplitLogManager extends Zoo
   }
 
   private void setDone(String path, TerminationStatus status) {
-    if (!ZKSplitLog.isRescanNode(watcher, path)) {
-      if (status == SUCCESS) {
-        tot_mgr_log_split_success.incrementAndGet();
-        LOG.info("Done splitting " + path);
-      } else {
-        tot_mgr_log_split_err.incrementAndGet();
-        LOG.warn("Error splitting " + path);
-      }
-    }
     Task task = tasks.get(path);
     if (task == null) {
       if (!ZKSplitLog.isRescanNode(watcher, path)) {
@@ -340,18 +344,24 @@ public class SplitLogManager extends Zoo
       }
     } else {
       synchronized (task) {
-        task.deleted = true;
-        // if in stopTrackingTasks() we were to make tasks orphan instead of
-        // forgetting about them then we will have to handle the race when
-        // accessing task.batch here.
-        if (!task.isOrphan()) {
-          synchronized (task.batch) {
-            if (status == SUCCESS) {
-              task.batch.done++;
-            } else {
-              task.batch.error++;
+        if (task.status == IN_PROGRESS) {
+          if (status == SUCCESS) {
+            tot_mgr_log_split_success.incrementAndGet();
+            LOG.info("Done splitting " + path);
+          } else {
+            tot_mgr_log_split_err.incrementAndGet();
+            LOG.warn("Error splitting " + path);
+          }
+          task.status = status;
+          if (task.batch != null) {
+            synchronized (task.batch) {
+              if (status == SUCCESS) {
+                task.batch.done++;
+              } else {
+                task.batch.error++;
+              }
+              task.batch.notify();
             }
-            task.batch.notify();
           }
         }
       }
@@ -394,6 +404,11 @@ public class SplitLogManager extends Zoo
 
   private void getDataSetWatchSuccess(String path, byte[] data, int version) {
     if (data == null) {
+      if (version == Integer.MIN_VALUE) {
+        // assume all done. The task znode suddenly disappeared.
+        setDone(path, SUCCESS);
+        return;
+      }
       tot_mgr_null_data.incrementAndGet();
       LOG.fatal("logic error - got null data " + path);
       setDone(path, FAILURE);
@@ -480,7 +495,7 @@ public class SplitLogManager extends Zoo
       ResubmitDirective directive) {
     // its ok if this thread misses the update to task.deleted. It will
     // fail later
-    if (task.deleted) {
+    if (task.status != IN_PROGRESS) {
       return false;
     }
     int version;
@@ -490,7 +505,8 @@ public class SplitLogManager extends Zoo
         return false;
       }
       if (task.unforcedResubmits >= resubmit_threshold) {
-        if (task.unforcedResubmits == resubmit_threshold) {
+        if (!task.resubmitThresholdReached) {
+          task.resubmitThresholdReached = true;
           tot_mgr_resubmit_threshold_reached.incrementAndGet();
           LOG.info("Skipping resubmissions of task " + path +
               " because threshold " + resubmit_threshold + " reached");
@@ -514,7 +530,9 @@ public class SplitLogManager extends Zoo
         return false;
       }
     } catch (NoNodeException e) {
-      LOG.debug("failed to resubmit " + path + " task done");
+      LOG.warn("failed to resubmit because znode doesn't exist " + path +
+          " task done (or forced done by removing the znode)");
+      getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
       return false;
     } catch (KeeperException e) {
       tot_mgr_resubmit_failed.incrementAndGet();
@@ -539,12 +557,18 @@ public class SplitLogManager extends Zoo
 
   private void deleteNode(String path, Long retries) {
     tot_mgr_node_delete_queued.incrementAndGet();
+    // Once a task znode is ready for delete, that is it is in the TASK_DONE
+    // state, then no one should be writing to it anymore. That is no one
+    // will be updating the znode version any more.
     this.watcher.getRecoverableZooKeeper().getZooKeeper().
       delete(path, -1, new DeleteAsyncCallback(),
         retries);
   }
 
   private void deleteNodeSuccess(String path) {
+    if (ignoreZKDeleteForTesting) {
+      return;
+    }
     Task task;
     task = tasks.remove(path);
     if (task == null) {
@@ -555,6 +579,10 @@ public class SplitLogManager extends Zoo
       LOG.debug("deleted task without in memory state " + path);
       return;
     }
+    synchronized (task) {
+      task.status = DELETED;
+      task.notify();
+    }
     tot_mgr_task_deleted.incrementAndGet();
   }
 
@@ -603,59 +631,67 @@ public class SplitLogManager extends Zoo
     Task oldtask;
     // batch.installed is only changed via this function and
     // a single thread touches batch.installed.
-    oldtask = tasks.putIfAbsent(path, new Task(batch));
-    if (oldtask != null) {
-      // new task was not used.
-      batch.installed--;
-      synchronized (oldtask) {
-        if (oldtask.isOrphan()) {
-          if (oldtask.deleted) {
-            // The task is already done. Do not install the batch for this
-            // task because it might be too late for setDone() to update
-            // batch.done. There is no need for the batch creator to wait for
-            // this task to complete.
-            return (null);
+    Task newtask = new Task();
+    newtask.batch = batch;
+    oldtask = tasks.putIfAbsent(path, newtask);
+    if (oldtask == null) {
+      batch.installed++;
+      return  null;
+    }
+    // new task was not used.
+    synchronized (oldtask) {
+      if (oldtask.isOrphan()) {
+        if (oldtask.status == SUCCESS) {
+          // The task is already done. Do not install the batch for this
+          // task because it might be too late for setDone() to update
+          // batch.done. There is no need for the batch creator to wait for
+          // this task to complete.
+          return (null);
+        }
+        if (oldtask.status == IN_PROGRESS) {
+          oldtask.batch = batch;
+          batch.installed++;
+          LOG.debug("Previously orphan task " + path +
+              " is now being waited upon");
+          return null;
+        }
+        while (oldtask.status == FAILURE) {
+          LOG.debug("wait for status of task " + path +
+              " to change to DELETED");
+          tot_mgr_wait_for_zk_delete.incrementAndGet();
+          try {
+            oldtask.wait();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted when waiting for znode delete callback");
+            // fall through to return failure
+            break;
           }
-          oldtask.setBatch(batch);
         }
-      }
-      LOG.info("Previously orphan task " + path +
-          " is now being waited upon");
-      return (null);
-    }
-    return oldtask;
-  }
-
-  /**
-   * This function removes any knowledge of this batch's tasks from the
-   * manager. It doesn't actually stop the active tasks. If the tasks are
-   * resubmitted then the active tasks will be reacquired and monitored by the
-   * manager. It is important to call this function when batch processing
-   * terminates prematurely, otherwise if the tasks are re-submitted
-   * then they might fail.
-   * <p>
-   * there is a slight race here. even after a task has been removed from
-   * {@link #tasks} someone who had acquired a reference to it will continue to
-   * process the task. That is OK since we don't actually change the task and
-   * the batch objects.
-   * <p>
-   * TODO Its  probably better to convert these to orphan tasks but then we
-   * have to deal with race conditions as we nullify Task's batch pointer etc.
-   * <p>
-   * @param batch
-   */
-  void stopTrackingTasks(TaskBatch batch) {
-    for (Map.Entry<String, Task> e : tasks.entrySet()) {
-      String path = e.getKey();
-      Task t = e.getValue();
-      if (t.batch == batch) { // == is correct. equals not necessary.
-        tasks.remove(path);
-      }
+        if (oldtask.status != DELETED) {
+          LOG.warn("Failure because previously failed task" +
+              " state still present. Waiting for znode delete callback" +
+              " path=" + path);
+          return oldtask;
+        }
+        // reinsert the newTask and it must succeed this time
+        Task t = tasks.putIfAbsent(path, newtask);
+        if (t == null) {
+          batch.installed++;
+          return  null;
+        }
+        LOG.fatal("Logic error. Deleted task still present in tasks map");
+        assert false : "Deleted task still present in tasks map";
+        return t;
+      }
+      LOG.warn("Failure because two threads can't wait for the same task. " +
+          " path=" + path);
+      return oldtask;
     }
   }
 
   Task findOrCreateOrphanTask(String path) {
-    Task orphanTask = new Task(null);
+    Task orphanTask = new Task();
     Task task;
     task = tasks.putIfAbsent(path, orphanTask);
     if (task == null) {
@@ -716,9 +752,10 @@ public class SplitLogManager extends Zoo
    * All access is synchronized.
    */
   static class TaskBatch {
-    int installed;
-    int done;
-    int error;
+    int installed = 0;
+    int done = 0;
+    int error = 0;
+    volatile boolean isDead = false;
 
     @Override
     public String toString() {
@@ -731,45 +768,35 @@ public class SplitLogManager extends Zoo
    * in memory state of an active task.
    */
   static class Task {
-    long last_update;
-    int last_version;
-    String cur_worker_name;
+    volatile long last_update;
+    volatile int last_version;
+    volatile String cur_worker_name;
     TaskBatch batch;
-    boolean deleted;
-    int incarnation;
-    int unforcedResubmits;
+    volatile TerminationStatus status;
+    volatile int incarnation;
+    volatile int unforcedResubmits;
+    volatile boolean resubmitThresholdReached;
 
     @Override
     public String toString() {
       return ("last_update = " + last_update +
           " last_version = " + last_version +
           " cur_worker_name = " + cur_worker_name +
-          " deleted = " + deleted +
+          " status = " + status +
           " incarnation = " + incarnation +
           " resubmits = " + unforcedResubmits +
           " batch = " + batch);
     }
 
-    Task(TaskBatch tb) {
+    Task() {
       incarnation = 0;
       last_version = -1;
-      deleted = false;
-      setBatch(tb);
+      status = IN_PROGRESS;
       setUnassigned();
     }
 
-    public void setBatch(TaskBatch batch) {
-      if (batch != null && this.batch != null) {
-        LOG.fatal("logic error - batch being overwritten");
-      }
-      this.batch = batch;
-      if (batch != null) {
-        batch.installed++;
-      }
-    }
-
     public boolean isOrphan() {
-      return (batch == null);
+      return (batch == null || batch.isDead);
     }
 
     public boolean isUnassigned() {
@@ -882,6 +909,16 @@ public class SplitLogManager extends Zoo
       if (tot > 0 && !found_assigned_task &&
           ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
           unassignedTimeout)) {
+        for (Map.Entry<String, Task> e : tasks.entrySet()) {
+          String path = e.getKey();
+          Task task = e.getValue();
+          // we have to do this check again because tasks might have
+          // been asynchronously assigned.
+          if (task.isUnassigned()) {
+            // We just touch the znode to make sure its still there
+            getDataSetWatch(path, zkretries);
+          }
+        }
         createRescanNode(Long.MAX_VALUE);
         tot_mgr_resubmit_unassigned.incrementAndGet();
         LOG.debug("resubmitting unassigned task(s) after timeout");
@@ -901,6 +938,12 @@ public class SplitLogManager extends Zoo
       tot_mgr_node_create_result.incrementAndGet();
       if (rc != 0) {
         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+          // What if there is a delete pending against this pre-existing
+          // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
+          // state. Only operations that will be carried out on this node by
+          // this manager are get-znode-data, task-finisher and delete-znode.
+          // And all code pieces correctly handle the case of suddenly
+          // disappearing task-znode.
           LOG.debug("found pre-existing znode " + path);
           tot_mgr_node_already_exists.incrementAndGet();
         } else {
@@ -933,6 +976,15 @@ public class SplitLogManager extends Zoo
         Stat stat) {
       tot_mgr_get_data_result.incrementAndGet();
       if (rc != 0) {
+        if (rc == KeeperException.Code.NONODE.intValue()) {
+          tot_mgr_get_data_nonode.incrementAndGet();
+          // The task znode has been deleted. Must be some pending delete
+          // that deleted the task. Assume success because a task-znode is
+          // is only deleted after TaskFinisher is successful.
+          LOG.warn("task znode " + path + " vanished.");
+          getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          return;
+        }
         Long retry_count = (Long) ctx;
         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
             path + " remaining retries=" + retry_count);
@@ -974,9 +1026,10 @@ public class SplitLogManager extends Zoo
           }
           return;
         } else {
-        LOG.debug(path
-            + " does not exist, either was never created or was deleted"
-            + " in earlier rounds, zkretries = " + (Long) ctx);
+        LOG.debug(path +
+            " does not exist. Either was created but deleted behind our" +
+            " back by another pending delete OR was deleted" +
+            " in earlier retry rounds. zkretries = " + (Long) ctx);
         }
       } else {
         LOG.debug("deleted " + path);
@@ -1014,46 +1067,10 @@ public class SplitLogManager extends Zoo
   }
 
   /**
-   * checks whether any new files have appeared in logDir which were
-   * not present in the original logfiles set
-   * @param logdir
-   * @param logfiles
-   * @return True if a new log file is found
-   * @throws IOException
-   */
-  public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles)
-  throws IOException {
-    if (logdir == null) {
-      return false;
-    }
-    LOG.debug("re-listing " + logdir);
-    tot_mgr_relist_logdir.incrementAndGet();
-    FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null);
-    if (newfiles == null) {
-      return false;
-    }
-    boolean matched;
-    for (FileStatus newfile : newfiles) {
-      matched = false;
-      for (FileStatus origfile : logfiles) {
-        if (origfile.equals(newfile)) {
-          matched = true;
-          break;
-        }
-      }
-      if (matched == false) {
-        LOG.warn("Discovered orphan hlog " + newfile + " after split." +
-        " Maybe HRegionServer was not dead when we started");
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * {@link SplitLogManager} can use objects implementing this interface to
    * finish off a partially done task by {@link SplitLogWorker}. This provides
-   * a serialization point at the end of the task processing.
+   * a serialization point at the end of the task processing. Must be
+   * restartable and idempotent.
    */
   static public interface TaskFinisher {
     /**
@@ -1085,7 +1102,19 @@ public class SplitLogManager extends Zoo
     FORCE();
   }
   enum TerminationStatus {
-    SUCCESS(),
-    FAILURE();
+    IN_PROGRESS("in_progress"),
+    SUCCESS("success"),
+    FAILURE("failure"),
+    DELETED("deleted");
+
+    String statusMsg;
+    TerminationStatus(String msg) {
+      statusMsg = msg;
+    }
+    
+    @Override
+    public String toString() {
+      return statusMsg;
+    }
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Fri Jan  6 03:53:58 2012
@@ -541,16 +541,21 @@ public class HLogSplitter {
       if (ZKSplitLog.isCorruptFlagFile(dst)) {
         continue;
       }
-      if (fs.exists(dst)) {
-        fs.delete(dst, false);
-      } else {
-        Path dstdir = dst.getParent();
-        if (!fs.exists(dstdir)) {
-          if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+      if (fs.exists(src)) {
+        if (fs.exists(dst)) {
+          fs.delete(dst, false);
+        } else {
+          Path dstdir = dst.getParent();
+          if (!fs.exists(dstdir)) {
+            if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+          }
         }
+        fs.rename(src, dst);
+        LOG.debug(" moved " + src + " => " + dst);
+      } else {
+        LOG.debug("Could not move recovered edits from " + src +
+            " as it doesn't exist");
       }
-      fs.rename(src, dst);
-      LOG.debug(" moved " + src + " => " + dst);
     }
     archiveLogs(null, corruptedLogs, processedLogs,
         oldLogDir, fs, conf);
@@ -600,24 +605,32 @@ public class HLogSplitter {
     }
     fs.mkdirs(oldLogDir);
 
+    // this method can get restarted or called multiple times for archiving
+    // the same log files.
     for (Path corrupted : corruptedLogs) {
       Path p = new Path(corruptDir, corrupted.getName());
-      if (!fs.rename(corrupted, p)) {
-        LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
-      } else {
-        LOG.info("Moving corrupted log " + corrupted + " to " + p);
+      if (fs.exists(corrupted)) {
+        if (!fs.rename(corrupted, p)) {
+          LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+        } else {
+          LOG.warn("Moving corrupted log " + corrupted + " to " + p);
+        }
       }
     }
 
     for (Path p : processedLogs) {
       Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
-      if (!fs.rename(p, newPath)) {
-        LOG.info("Unable to move  " + p + " to " + newPath);
-      } else {
-        LOG.info("Archived processed log " + p + " to " + newPath);
+      if (fs.exists(p)) {
+        if (!fs.rename(p, newPath)) {
+          LOG.warn("Unable to move  " + p + " to " + newPath);
+        } else {
+          LOG.debug("Archived processed log " + p + " to " + newPath);
+        }
       }
     }
 
+    // distributed log splitting removes the srcDir (region's log dir) later
+    // when all the log files in that srcDir have been successfully processed
     if (srcDir != null && !fs.delete(srcDir, true)) {
       throw new IOException("Unable to delete src dir: " + srcDir);
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Fri Jan  6 03:53:58 2012
@@ -215,6 +215,7 @@ public class ZKSplitLog {
     public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
     public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
@@ -224,6 +225,7 @@ public class ZKSplitLog {
     public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
     public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
     public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
     public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
     public static AtomicLong tot_mgr_resubmit_threshold_reached =
       new AtomicLong(0);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Fri Jan  6 03:53:58 2012
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -142,51 +143,6 @@ public class TestDistributedLogSplitting
     ht.close();
   }
 
-  @Test(expected=OrphanHLogAfterSplitException.class, timeout=300000)
-  public void testOrphanLogCreation() throws Exception {
-    LOG.info("testOrphanLogCreation");
-    startCluster(NUM_RS);
-    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
-    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    HRegionServer hrs = rsts.get(0).getRegionServer();
-    Path rootdir = FSUtils.getRootDir(conf);
-    final Path logDir = new Path(rootdir,
-        HLog.getHLogDirectoryName(hrs.getServerName().toString()));
-
-    installTable(new ZooKeeperWatcher(conf, "table-creation", null),
-        "table", "family", 40);
-
-    makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table",
-        1000, 100);
-
-    new Thread() {
-      public void run() {
-        while (true) {
-          int i = 0;
-          try {
-            while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
-              0) {
-              Thread.yield();
-            }
-            fs.createNewFile(new Path(logDir, "foo" + i++));
-          } catch (Exception e) {
-            LOG.debug("file creation failed", e);
-            return;
-          }
-        }
-      }
-    }.start();
-    slm.splitLogDistributed(logDir);
-    FileStatus[] files = fs.listStatus(logDir);
-    if (files != null) {
-      for (FileStatus file : files) {
-        LOG.debug("file still there " + file.getPath());
-      }
-    }
-  }
-
   @Test (timeout=300000)
   public void testRecoveredEdits() throws Exception {
     LOG.info("testRecoveredEdits");
@@ -309,6 +265,45 @@ public class TestDistributedLogSplitting
         "tot_wkr_preempt_task");
   }
 
+  @Test
+  public void testDelayedDeleteOnFailure() throws Exception {
+    LOG.info("testDelayedDeleteOnFailure");
+    startCluster(1);
+    final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
+    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+    final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
+    fs.mkdirs(logDir);
+    final Path corruptedLogFile = new Path(logDir, "x");
+    FSDataOutputStream out;
+    out = fs.create(corruptedLogFile);
+    out.write(0);
+    out.write(Bytes.toBytes("corrupted bytes"));
+    out.close();
+    slm.ignoreZKDeleteForTesting = true;
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        try {
+          slm.splitLogDistributed(logDir);
+        } catch (IOException ioe) {
+          try {
+            assertTrue(fs.exists(corruptedLogFile));
+            slm.splitLogDistributed(logDir);
+          } catch (IOException e) {
+            assertTrue(Thread.currentThread().isInterrupted());
+            return;
+          }
+          fail("did not get the expected IOException from the 2nd call");
+        }
+        fail("did not get the expected IOException from the 1st call");
+      }
+    };
+    t.start();
+    waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
+    t.interrupt();
+    t.join();
+  }
+
   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname,
       int nrs ) throws Exception {
     // Create a table with regions

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1227951&r1=1227950&r2=1227951&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Fri Jan  6 03:53:58 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zo
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.*;
@@ -31,6 +32,7 @@ import static org.junit.Assert.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
@@ -112,19 +114,34 @@ public class TestSplitLogManager {
     TEST_UTIL.shutdownMiniZKCluster();
   }
 
-  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+  private interface Expr {
+    public long eval();
+  }
+
+  private void waitForCounter(final AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    Expr e = new Expr() {
+      public long eval() {
+        return ctr.get();
+      }
+    };
+    waitForCounter(e, oldval, newval, timems);
+    return;
+  }
+
+  private void waitForCounter(Expr e, long oldval, long newval,
       long timems) {
     long curt = System.currentTimeMillis();
     long endt = curt + timems;
     while (curt < endt) {
-      if (ctr.get() == oldval) {
+      if (e.eval() == oldval) {
         try {
           Thread.sleep(10);
-        } catch (InterruptedException e) {
+        } catch (InterruptedException eintr) {
         }
         curt = System.currentTimeMillis();
       } else {
-        assertEquals(newval, ctr.get());
+        assertEquals(newval, e.eval());
         return;
       }
     }
@@ -267,10 +284,8 @@ public class TestSplitLogManager {
   public void testRescanCleanup() throws Exception {
     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
 
-    int to = 1000;
-    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeout", 1000);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
-    to = to + 2 * 100;
     slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
@@ -280,14 +295,23 @@ public class TestSplitLogManager {
 
     ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1"));
     waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
-    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
-    int version1 = ZKUtil.checkExists(zkw, tasknode);
-    assertTrue(version1 > version);
-    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
-    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
-        taskstate));
-
-    waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+    waitForCounter(new Expr() {
+      @Override
+      public long eval() {
+        return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
+      }
+    }, 0, 1, 5*60000); // wait long enough
+    if (tot_mgr_resubmit_failed.get() == 0) {
+      int version1 = ZKUtil.checkExists(zkw, tasknode);
+      assertTrue(version1 > version);
+      byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+      assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+          taskstate));
+      
+      waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+    } else {
+      LOG.warn("Could not run test. Lost ZK connection?");
+    }
 
     return;
   }
@@ -419,6 +443,52 @@ public class TestSplitLogManager {
     return;
   }
 
+  @Test
+  public void testEmptyLogDir() throws Exception {
+    LOG.info("testEmptyLogDir");
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
+        UUID.randomUUID().toString());
+    fs.mkdirs(emptyLogDirPath);
+    slm.splitLogDistributed(emptyLogDirPath);
+    assertFalse(fs.exists(emptyLogDirPath));
+  }
+
+  @Test
+  public void testVanishingTaskZNode() throws Exception {
+    LOG.info("testVanishingTaskZNode");
+    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    final Path logDir = new Path(fs.getWorkingDirectory(),
+        UUID.randomUUID().toString());
+    fs.mkdirs(logDir);
+    Path logFile = new Path(logDir, UUID.randomUUID().toString());
+    fs.createNewFile(logFile);
+    new Thread() {
+      public void run() {
+        try {
+          // this call will block because there are no SplitLogWorkers
+          slm.splitLogDistributed(logDir);
+        } catch (Exception e) {
+          LOG.warn("splitLogDistributed failed", e);
+          fail();
+        }
+      }
+    }.start();
+    waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
+    String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
+    // remove the task znode
+    ZKUtil.deleteNode(zkw, znode);
+    waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
+    waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
+    assertTrue(fs.exists(logFile));
+    fs.delete(logDir, true);
+  }
+
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
     new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();