You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/04/17 17:53:28 UTC

svn commit: r1468981 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/master/ test/...

Author: jxiang
Date: Wed Apr 17 15:53:27 2013
New Revision: 1468981

URL: http://svn.apache.org/r1468981
Log:
HBASE-8321 Log split worker should heartbeat to avoid timeout when the hlog is under recovery

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Apr 17 15:53:27 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.SplitLogT
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -81,13 +82,16 @@ public class SplitLogWorker extends ZooK
   private volatile boolean exitWorker;
   private final Object grabTaskLock = new Object();
   private boolean workerInGrabTask = false;
-
+  private final int report_period;
 
   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
       ServerName serverName, TaskExecutor splitTaskExecutor) {
     super(watcher);
     this.serverName = serverName;
     this.splitTaskExecutor = splitTaskExecutor;
+    report_period = conf.getInt("hbase.splitlog.report.period",
+      conf.getInt("hbase.splitlog.manager.timeout",
+        SplitLogManager.DEFAULT_TIMEOUT) / 2);
   }
 
   public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
@@ -274,15 +278,22 @@ public class SplitLogWorker extends ZooK
       status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
           new CancelableProgressable() {
 
+        private long last_report_at = 0;
+
         @Override
         public boolean progress() {
-          if (!attemptToOwnTask(false)) {
-            LOG.warn("Failed to heartbeat the task" + currentTask);
-            return false;
+          long t = EnvironmentEdgeManager.currentTimeMillis();
+          if ((t - last_report_at) > report_period) {
+            last_report_at = t;
+            if (!attemptToOwnTask(false)) {
+              LOG.warn("Failed to heartbeat the task" + currentTask);
+              return false;
+            }
           }
           return true;
         }
       });
+
       switch (status) {
         case DONE:
           endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Wed Apr 17 15:53:27 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 public class HLogFactory {
@@ -70,6 +71,11 @@ public class HLogFactory {
       logReaderClass = null;
     }
 
+    public static HLog.Reader createReader(final FileSystem fs,
+        final Path path, Configuration conf) throws IOException {
+      return createReader(fs, path, conf, null);
+    }
+
     /**
      * Create a reader for the WAL. If you are reading from a file that's being written to
      * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
@@ -77,8 +83,8 @@ public class HLogFactory {
      * @return A WAL reader.  Close when done with it.
      * @throws IOException
      */
-    public static HLog.Reader createReader(final FileSystem fs,
-        final Path path, Configuration conf) throws IOException {
+    public static HLog.Reader createReader(final FileSystem fs, final Path path,
+        Configuration conf, CancelableProgressable reporter) throws IOException {
       if (logReaderClass == null) {
         logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
           SequenceFileLogReader.class, Reader.class);
@@ -102,6 +108,9 @@ public class HLogFactory {
               if (++nbAttempt == 1) {
                 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
               }
+              if (reporter != null && !reporter.progress()) {
+                throw new InterruptedIOException("Operation is cancelled");
+              }
               if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
                 LOG.error("Can't open after " + nbAttempt + " attempts and "
                   + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Apr 17 15:53:27 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -113,9 +112,6 @@ public class HLogSplitter {
 
   private MonitoredTask status;
 
-  // Used in distributed log splitting
-  private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
-
   // For checking the latest flushed sequence id
   protected final LastSequenceId sequenceIdChecker;
 
@@ -263,10 +259,6 @@ public class HLogSplitter {
     return outputSink.getOutputCounts();
   }
 
-  void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) {
-    this.distributedLogSplittingHelper = helper;
-  }
-
   /**
    * Splits the HLog edits in the given list of logfiles (that are a mix of edits
    * on multiple regions) by region and then splits them per region directories,
@@ -317,7 +309,7 @@ public class HLogSplitter {
           //meta only. However, there is a sequence number that can be obtained
           //only by parsing.. so we parse for all files currently
           //TODO: optimize this part somehow
-          in = getReader(fs, log, conf, skipErrors);
+          in = getReader(fs, log, conf, skipErrors, null);
           if (in != null) {
             parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
           }
@@ -420,53 +412,58 @@ public class HLogSplitter {
       CancelableProgressable reporter) throws IOException {
     boolean isCorrupted = false;
     Preconditions.checkState(status == null);
-    status = TaskMonitor.get().createStatus(
-        "Splitting log file " + logfile.getPath() +
-        "into a temporary staging area.");
     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
-        HLog.SPLIT_SKIP_ERRORS_DEFAULT);
+      HLog.SPLIT_SKIP_ERRORS_DEFAULT);
     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
     Path logPath = logfile.getPath();
-    long logLength = logfile.getLen();
-    LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
-    status.setStatus("Opening log file");
-    Reader in = null;
-    try {
-      in = getReader(fs, logfile, conf, skipErrors);
-    } catch (CorruptedLogFileException e) {
-      LOG.warn("Could not get reader, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
-      isCorrupted = true;
-    }
-    if (in == null) {
-      status.markComplete("Was nothing to split in log file");
-      LOG.warn("Nothing to split in log file " + logPath);
-      return true;
-    }
-    this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
-    if (!reportProgressIfIsDistributedLogSplitting()) {
-      return false;
-    }
+    boolean outputSinkStarted = false;
     boolean progress_failed = false;
-    int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
-    int numOpenedFilesLastCheck = 0;
-    outputSink.startWriterThreads();
-    // Report progress every so many edits and/or files opened (opening a file
-    // takes a bit of time).
-    Map<byte[], Long> lastFlushedSequenceIds =
-      new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-    Entry entry;
     int editsCount = 0;
     int editsSkipped = 0;
+
     try {
+      status = TaskMonitor.get().createStatus(
+        "Splitting log file " + logfile.getPath() +
+        "into a temporary staging area.");
+      long logLength = logfile.getLen();
+      LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+      status.setStatus("Opening log file");
+      if (reporter != null && !reporter.progress()) {
+        progress_failed = true;
+        return false;
+      }
+      Reader in = null;
+      try {
+        in = getReader(fs, logfile, conf, skipErrors, reporter);
+      } catch (CorruptedLogFileException e) {
+        LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+        ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+        isCorrupted = true;
+      }
+      if (in == null) {
+        status.markComplete("Was nothing to split in log file");
+        LOG.warn("Nothing to split in log file " + logPath);
+        return true;
+      }
+      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
+      int numOpenedFilesLastCheck = 0;
+      outputSink.setReporter(reporter);
+      outputSink.startWriterThreads();
+      outputSinkStarted = true;
+      // Report progress every so many edits and/or files opened (opening a file
+      // takes a bit of time).
+      Map<byte[], Long> lastFlushedSequenceIds =
+        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      Entry entry;
+
       while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
         byte[] region = entry.getKey().getEncodedRegionName();
         Long lastFlushedSequenceId = -1l;
         if (sequenceIdChecker != null) {
           lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
           if (lastFlushedSequenceId == null) {
-              lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
-              lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+            lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
+            lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
           }
         }
         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -482,7 +479,8 @@ public class HLogSplitter {
           String countsStr = (editsCount - editsSkipped) +
             " edits, skipped " + editsSkipped + " edits.";
           status.setStatus("Split " + countsStr);
-          if (!reportProgressIfIsDistributedLogSplitting()) {
+          if (reporter != null && !reporter.progress()) {
+            progress_failed = true;
             return false;
           }
         }
@@ -500,12 +498,13 @@ public class HLogSplitter {
       throw e;
     } finally {
       LOG.info("Finishing writing output logs and closing down.");
-      progress_failed = outputSink.finishWritingAndClose() == null;
+      if (outputSinkStarted) {
+        progress_failed = outputSink.finishWritingAndClose() == null;
+      }
       String msg = "Processed " + editsCount + " edits across "
-          + outputSink.getOutputCounts().size() + " regions; log file="
-          + logPath + " is corrupted = " + isCorrupted + " progress failed = "
-          + progress_failed;
-      ;
+        + outputSink.getOutputCounts().size() + " regions; log file="
+        + logPath + " is corrupted = " + isCorrupted + " progress failed = "
+        + progress_failed;
       LOG.info(msg);
       status.markComplete(msg);
     }
@@ -620,6 +619,7 @@ public class HLogSplitter {
    * @return Path to file into which to dump split log edits.
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   static Path getRegionSplitEditsPath(final FileSystem fs,
       final Entry logEntry, final Path rootDir, boolean isCreate)
   throws IOException {
@@ -724,7 +724,7 @@ public class HLogSplitter {
    * @throws CorruptedLogFileException
    */
   protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
-      boolean skipErrors)
+      boolean skipErrors, CancelableProgressable reporter)
       throws IOException, CorruptedLogFileException {
     Path path = file.getPath();
     long length = file.getLen();
@@ -739,9 +739,9 @@ public class HLogSplitter {
     }
 
     try {
-      FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
+      FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
       try {
-        in = getReader(fs, path, conf);
+        in = getReader(fs, path, conf, reporter);
       } catch (EOFException e) {
         if (length <= 0) {
           // TODO should we ignore an empty, not-last log file if skip.errors
@@ -757,8 +757,8 @@ public class HLogSplitter {
         }
       }
     } catch (IOException e) {
-      if (!skipErrors) {
-        throw e;
+      if (!skipErrors || e instanceof InterruptedIOException) {
+        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
       }
       CorruptedLogFileException t =
         new CorruptedLogFileException("skipErrors=true Could not open hlog " +
@@ -826,9 +826,9 @@ public class HLogSplitter {
   /**
    * Create a new {@link Reader} for reading logs to split.
    */
-  protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
-      throws IOException {
-    return HLogFactory.createReader(fs, curLogFile, conf);
+  protected Reader getReader(FileSystem fs, Path curLogFile,
+      Configuration conf, CancelableProgressable reporter) throws IOException {
+    return HLogFactory.createReader(fs, curLogFile, conf, reporter);
   }
 
   /**
@@ -1108,56 +1108,6 @@ public class HLogSplitter {
     return ret;
   }
 
-  /***
-   * @return false if it is a distributed log splitting and it failed to report
-   *         progress
-   */
-  private boolean reportProgressIfIsDistributedLogSplitting() {
-    if (this.distributedLogSplittingHelper != null) {
-      return distributedLogSplittingHelper.reportProgress();
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * A class used in distributed log splitting
-   *
-   */
-  class DistributedLogSplittingHelper {
-    // Report progress, only used in distributed log splitting
-    private final CancelableProgressable splitReporter;
-    // How often to send a progress report (default 1/2 master timeout)
-    private final int report_period;
-    private long last_report_at = 0;
-
-    public DistributedLogSplittingHelper(CancelableProgressable reporter) {
-      this.splitReporter = reporter;
-      report_period = conf.getInt("hbase.splitlog.report.period",
-          conf.getInt("hbase.splitlog.manager.timeout",
-              SplitLogManager.DEFAULT_TIMEOUT) / 2);
-    }
-
-    /***
-     * @return false if reporter failed progressing
-     */
-    private boolean reportProgress() {
-      if (splitReporter == null) {
-        return true;
-      } else {
-        long t = EnvironmentEdgeManager.currentTimeMillis();
-        if ((t - last_report_at) > report_period) {
-          last_report_at = t;
-          if (this.splitReporter.progress() == false) {
-            LOG.warn("Failed: reporter.progress asked us to terminate");
-            return false;
-          }
-        }
-        return true;
-      }
-    }
-  }
-
   /**
    * Class that manages the output streams from the log splitting process.
    */
@@ -1178,6 +1128,8 @@ public class HLogSplitter {
 
     private final int numThreads;
 
+    private CancelableProgressable reporter = null;
+
     public OutputSink() {
       // More threads could potentially write faster at the expense
       // of causing more disk seeks as the logs are split.
@@ -1188,6 +1140,10 @@ public class HLogSplitter {
           "hbase.regionserver.hlog.splitlog.writer.threads", 3);
     }
 
+    void setReporter(CancelableProgressable reporter) {
+      this.reporter = reporter;
+    }
+
     /**
      * Start the threads that will pump data from the entryBuffers
      * to the output files.
@@ -1213,7 +1169,7 @@ public class HLogSplitter {
           t.finish();
         }
         for (WriterThread t : writerThreads) {
-          if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
+          if (!progress_failed && reporter != null && !reporter.progress()) {
             progress_failed = true;
           }
           try {
@@ -1309,7 +1265,7 @@ public class HLogSplitter {
         for (int i = 0, n = logWriters.size(); i < n; i++) {
           Future<Void> future = completionService.take();
           future.get();
-          if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
+          if (!progress_failed && reporter != null && !reporter.progress()) {
             progress_failed = true;
           }
         }
@@ -1437,8 +1393,6 @@ public class HLogSplitter {
     }
   }
 
-
-
   /**
    *  Private data structure that wraps a Writer and its Path,
    *  also collecting statistics about the data written to this

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Wed Apr 17 15:53:27 2013
@@ -38,15 +38,15 @@ import java.io.InterruptedIOException;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class FSHDFSUtils extends FSUtils{
+public class FSHDFSUtils extends FSUtils {
   private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
 
   /**
    * Recover the lease from HDFS, retrying multiple times.
    */
   @Override
-  public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
-      throws IOException {
+  public void recoverFileLease(final FileSystem fs, final Path p,
+      Configuration conf, CancelableProgressable reporter) throws IOException {
     // lease recovery not needed for local file system case.
     if (!(fs instanceof DistributedFileSystem)) {
       return;
@@ -81,6 +81,9 @@ public class FSHDFSUtils extends FSUtils
             ", retrying.", e);
       }
       if (!recovered) {
+        if (reporter != null && !reporter.progress()) {
+          throw new InterruptedIOException("Operation is cancelled");
+        }
         // try at least twice.
         if (nbAttempt > 2 && recoveryTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
           LOG.error("Can't recoverLease after " + nbAttempt + " attempts and " +

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java Wed Apr 17 15:53:27 2013
@@ -35,9 +35,9 @@ import org.apache.commons.logging.LogFac
 public class FSMapRUtils extends FSUtils {
   private static final Log LOG = LogFactory.getLog(FSMapRUtils.class);
   
-  public void recoverFileLease(final FileSystem fs, final Path p, 
-      Configuration conf) throws IOException {
-    LOG.info("Recovering file " + p.toString() + 
+  public void recoverFileLease(final FileSystem fs, final Path p,
+      Configuration conf, CancelableProgressable reporter) throws IOException {
+    LOG.info("Recovering file " + p.toString() +
       " by changing permission to readonly");
     FsPermission roPerm = new FsPermission((short) 0444);
     fs.setPermission(p, roPerm);

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Wed Apr 17 15:53:27 2013
@@ -204,6 +204,7 @@ public abstract class FSUtils {
    * @return output stream to the created file
    * @throws IOException if the file cannot be created
    */
+  @SuppressWarnings("deprecation")
   public static FSDataOutputStream create(FileSystem fs, Path path,
       FsPermission perm, boolean overwrite) throws IOException {
     LOG.debug("Creating file=" + path + " with permission=" + perm);
@@ -761,6 +762,7 @@ public abstract class FSUtils {
    * @return true if exists
    * @throws IOException e
    */
+  @SuppressWarnings("deprecation")
   public static boolean metaRegionExists(FileSystem fs, Path rootdir)
   throws IOException {
     Path rootRegionDir =
@@ -1113,7 +1115,7 @@ public abstract class FSUtils {
    * @throws IOException
    */
   public abstract void recoverFileLease(final FileSystem fs, final Path p,
-      Configuration conf) throws IOException;
+      Configuration conf, CancelableProgressable reporter) throws IOException;
 
   /**
    * @param fs

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java Wed Apr 17 15:53:27 2013
@@ -753,7 +753,7 @@ public class RegionSplitter {
     } else {
       LOG.debug("_balancedSplit file found. Replay log to restore state...");
       FSUtils.getInstance(fs, table.getConfiguration())
-        .recoverFileLease(fs, splitFile, table.getConfiguration());
+        .recoverFileLease(fs, splitFile, table.getConfiguration(), null);
 
       // parse split file and process remaining splits
       FSDataInputStream tmpIn = fs.open(splitFile);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Wed Apr 17 15:53:27 2013
@@ -159,6 +159,7 @@ public class TestDistributedLogSplitting
     for (HRegionInfo hri : regions) {
 
       Path tdir = HTableDescriptor.getTableDir(rootdir, table);
+      @SuppressWarnings("deprecation")
       Path editsdir =
         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
       LOG.debug("checking edits dir " + editsdir);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Apr 17 15:53:27 2013
@@ -459,7 +459,7 @@ public class TestHLog  {
       public void run() {
           try {
             FSUtils.getInstance(fs, rlConf)
-              .recoverFileLease(recoveredFs, walPath, rlConf);
+              .recoverFileLease(recoveredFs, walPath, rlConf, null);
           } catch (IOException e) {
             exception = e;
           }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1468981&r1=1468980&r2=1468981&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Apr 17 15:53:27 2013
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.LargeTest
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -918,6 +920,45 @@ public class TestHLogSplit {
     }
   }
 
+  @Test
+  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(HLOGDIR)[0];
+    fs.initialize(fs.getUri(), conf);
+
+    final AtomicInteger count = new AtomicInteger();
+    
+    CancelableProgressable localReporter
+      = new CancelableProgressable() {
+        @Override
+        public boolean progress() {
+          count.getAndIncrement();
+          return false;
+        }
+      };
+
+    FileSystem spiedFs = Mockito.spy(fs);
+    Mockito.doAnswer(new Answer<FSDataInputStream>() {
+      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(1500); // Sleep a while and wait report status invoked
+        return (FSDataInputStream)invocation.callRealMethod();
+      }
+    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+
+    try {
+      conf.setInt("hbase.splitlog.report.period", 1000);
+      HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null);
+      boolean ret = s.splitLogFile(logfile, localReporter);
+      assertFalse("Log splitting should failed", ret);
+      assertTrue(count.get() > 0);
+    } catch (IOException e) {
+      fail("There shouldn't be any exception but: " + e.toString());
+    } finally {
+      // reset it back to its default value
+      conf.setInt("hbase.splitlog.report.period", 59000);
+    }
+  }
+
   /**
    * Test log split process with fake data and lots of edits to trigger threading
    * issues.
@@ -1000,8 +1041,8 @@ public class TestHLogSplit {
 
 
       /* Produce a mock reader that generates fake entries */
-      protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
-      throws IOException {
+      protected Reader getReader(FileSystem fs, Path curLogFile,
+          Configuration conf, CancelableProgressable reporter) throws IOException {
         Reader mockReader = Mockito.mock(Reader.class);
         Mockito.doAnswer(new Answer<HLog.Entry>() {
           int index = 0;