You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/05/14 01:59:05 UTC

svn commit: r944063 - in /hadoop/hbase/trunk: ./ core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ core/src/test/java/org/apache/hadoop/hbase/ core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: stack
Date: Thu May 13 23:59:05 2010
New Revision: 944063

URL: http://svn.apache.org/viewvc?rev=944063&view=rev
Log:
HBASE-2544 Forward port branch 0.20 WAL to TRUNK

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=944063&r1=944062&r2=944063&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu May 13 23:59:05 2010
@@ -308,6 +308,7 @@ Release 0.21.0 - Unreleased
    HBASE-2431  Master does not respect generation stamps, may result in meta
                getting permanently offlined
    HBASE-2515  ChangeTableState considers split&&offline regions as being served
+   HBASE-2544  Forward port branch 0.20 WAL to TRUNK
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=944063&r1=944062&r2=944063&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu May 13 23:59:05 2010
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,34 +19,14 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.Writable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -66,6 +46,32 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
 /**
  * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
  * implementation.
@@ -120,20 +126,21 @@ public class HLog implements HConstants,
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
   private final Path oldLogDir;
 
-  public interface Reader {
+  private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
+  private int initialReplication;    // initial replication factor of SequenceFile.writer
+  private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
+  final static Object [] NO_ARGS = new Object []{};
 
-    void init(FileSystem fs, Path path, Configuration c) throws IOException;
+  // used to indirectly tell syncFs to force the sync
+  private boolean forceSync = false;
 
+  public interface Reader {
+    void init(FileSystem fs, Path path, Configuration c) throws IOException;
     void close() throws IOException;
-
     Entry next() throws IOException;
-
     Entry next(Entry reuse) throws IOException;
-
     void seek(long pos) throws IOException;
-
     long getPosition() throws IOException;
-
   }
 
   public interface Writer {
@@ -144,9 +151,6 @@ public class HLog implements HConstants,
     long getLength() throws IOException;
   }
 
-  // used to indirectly tell syncFs to force the sync
-  private boolean forceSync = false;
-
   /*
    * Current log file.
    */
@@ -168,11 +172,14 @@ public class HLog implements HConstants,
 
   private final AtomicLong logSeqNum = new AtomicLong(0);
 
+  // The timestamp (in ms) when the log file was created.
   private volatile long filenum = -1;
 
+  //number of transactions in the current Hlog.
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
-  // If > than this size, roll the log.
+  // If > than this size, roll the log. This is typically 0.95 times the size
+  // of the default Hdfs block size.
   private final long logrollsize;
 
   // This lock prevents starting a log roll during a cache flush.
@@ -272,7 +279,7 @@ public class HLog implements HConstants,
     }
     fs.mkdirs(dir);
     this.oldLogDir = oldLogDir;
-    if(!fs.exists(oldLogDir)) {
+    if (!fs.exists(oldLogDir)) {
       fs.mkdirs(this.oldLogDir);
     }
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
@@ -282,7 +289,29 @@ public class HLog implements HConstants,
       ", enabled=" + this.enabled +
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
+    // rollWriter sets this.hdfs_out if it can.
     rollWriter();
+
+    // handle the reflection necessary to call getNumCurrentReplicas()
+    this.getNumCurrentReplicas = null;
+    if(this.hdfs_out != null) {
+      try {
+        this.getNumCurrentReplicas = this.hdfs_out.getClass().
+          getMethod("getNumCurrentReplicas", new Class<?> []{});
+        this.getNumCurrentReplicas.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        // Thrown if getNumCurrentReplicas() function isn't available
+      } catch (SecurityException e) {
+        // Thrown if we can't get access to getNumCurrentReplicas()
+        this.getNumCurrentReplicas = null; // could happen on setAccessible()
+      }
+    }
+    if(this.getNumCurrentReplicas != null) {
+      LOG.info("Using getNumCurrentReplicas--HDFS-826");
+    } else {
+      LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
+    }
+
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
     Threads.setDaemonThreadRunning(logSyncerThread,
         Thread.currentThread().getName() + ".logSyncer");
@@ -355,6 +384,17 @@ public class HLog implements HConstants,
         this.filenum = System.currentTimeMillis();
         Path newPath = computeFilename(this.filenum);
         this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
+        this.initialReplication = fs.getFileStatus(newPath).getReplication();
+
+        // Can we get at the dfsclient outputstream?  If an instance of
+        // SFLW, it'll have done the necessary reflection to get at the
+        // protected field name.
+        this.hdfs_out = null;
+        if (this.writer instanceof SequenceFileLogWriter) {
+          this.hdfs_out =
+            ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
+        }
+
         LOG.info((oldFile != null?
             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
             this.numEntries.get() +
@@ -628,7 +668,7 @@ public class HLog implements HConstants,
   throws IOException {
     byte [] regionName = regionInfo.getRegionName();
     byte [] tableName = regionInfo.getTableDesc().getName();
-    this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion);
+    this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
   }
 
   /**
@@ -650,8 +690,7 @@ public class HLog implements HConstants,
    * @param logKey
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
-                     final boolean isMetaRegion)
+  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -672,7 +711,7 @@ public class HLog implements HConstants,
     }
 
     // sync txn to file system
-    this.sync(isMetaRegion);
+    this.sync(regionInfo.isMetaRegion());
   }
 
   /**
@@ -705,15 +744,14 @@ public class HLog implements HConstants,
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum = obtainSeqNum();
     synchronized (this.updateLock) {
+      long seqNum = obtainSeqNum();
       // The 'lastSeqWritten' map holds the sequence number of the oldest
       // write for each region (i.e. the first edit added to the particular
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, seqNum);
-      int counter = 0;
       HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
       doWrite(info, logKey, edits);
       this.numEntries.incrementAndGet();
@@ -803,10 +841,6 @@ public class HLog implements HConstants,
           LOG.warn(getName() + " was shut down while waiting for sync");
           return;
         }
-        if (syncerShuttingDown) {
-          LOG.warn(getName() + " was shut down while waiting for sync");
-          return;
-        }
         if(force) {
           forceSync = true;
         }
@@ -852,7 +886,24 @@ public class HLog implements HConstants,
           syncOps++;
           this.forceSync = false;
           this.unflushedEntries.set(0);
-          // TODO: HBASE-2401
+
+          // if the number of replicas in HDFS has fallen below the initial
+          // value, then roll logs.
+          try {
+            int numCurrentReplicas = getLogReplication();
+            if (numCurrentReplicas != 0 &&
+                numCurrentReplicas < this.initialReplication) {
+              LOG.warn("HDFS pipeline error detected. " +
+                  "Found " + numCurrentReplicas + " replicas but expecting " +
+                  this.initialReplication + " replicas. " +
+                  " Requesting close of hlog.");
+              requestLogRoll();
+              logRollRequested = true;
+            }
+          } catch (Exception e) {
+              LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
+                       " still proceeding ahead...");
+          }
         } catch (IOException e) {
           LOG.fatal("Could not append. Requesting close of hlog", e);
           requestLogRoll();
@@ -866,6 +917,29 @@ public class HLog implements HConstants,
     }
   }
 
+  /**
+   * This method gets the datanode replication count for the current HLog.
+   *
+   * If the pipeline isn't started yet or is empty, you will get the default
+   * replication factor.  Therefore, if this function returns 0, it means you
+   * are not properly running with the HDFS-826 patch.
+   *
+   * @throws Exception
+   */
+  int getLogReplication() throws Exception {
+    if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
+      Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
+      if (repl instanceof Integer) {
+        return ((Integer)repl).intValue();
+      }
+    }
+    return 0;
+  }
+
+  boolean canGetCurReplicas() {
+    return this.getNumCurrentReplicas != null;
+  }
+
   public void hsync() throws IOException {
     // Not yet implemented up in hdfs so just call hflush.
     hflush();
@@ -916,20 +990,6 @@ public class HLog implements HConstants,
     return outputfiles.size();
   }
 
-  /*
-   * Obtain a specified number of sequence numbers
-   *
-   * @param num number of sequence numbers to obtain
-   * @return array of sequence numbers
-   */
-  private long [] obtainSeqNum(int num) {
-    long [] results = new long[num];
-    for (int i = 0; i < num; i++) {
-      results[i] = this.logSeqNum.incrementAndGet();
-    }
-    return results;
-  }
-
   /**
    * By acquiring a log sequence ID, we can allow log messages to continue while
    * we flush the cache.
@@ -968,10 +1028,10 @@ public class HLog implements HConstants,
       }
       synchronized (updateLock) {
         long now = System.currentTimeMillis();
-        WALEdit edits = completeCacheFlushLogEdit();
-        this.writer.append(new HLog.Entry(
-          makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
-          edits));
+        WALEdit edit = completeCacheFlushLogEdit();
+        HLogKey key = makeKey(regionName, tableName, logSeqId,
+            System.currentTimeMillis());
+        this.writer.append(new Entry(key, edit));
         writeTime += System.currentTimeMillis() - now;
         writeOps++;
         this.numEntries.incrementAndGet();
@@ -1024,11 +1084,12 @@ public class HLog implements HConstants,
    *                <code>${ROOTDIR}/log_HOST_PORT</code>
    * @param oldLogDir
    * @param fs FileSystem
-   * @param conf HBaseConfiguration
+   * @param conf Configuration
    * @throws IOException
    */
   public static List<Path> splitLog(final Path rootDir, final Path srcDir,
-    Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException {
+    Path oldLogDir, final FileSystem fs, final Configuration conf)
+  throws IOException {
 
     long millis = System.currentTimeMillis();
     List<Path> splits = null;
@@ -1104,7 +1165,8 @@ public class HLog implements HConstants,
    */
   private static List<Path> splitLog(final Path rootDir,
     Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
-    final Configuration conf) throws IOException {
+    final Configuration conf)
+  throws IOException {
     final Map<byte [], WriterAndPath> logWriters =
       Collections.synchronizedMap(
         new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
@@ -1115,35 +1177,51 @@ public class HLog implements HConstants,
     int logWriterThreads =
       conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
 
-    // Number of logs to read concurrently when log splitting.
-    // More means faster but bigger mem consumption  */
-    int concurrentLogReads =
+    // Number of logs to read into memory before writing to their appropriate
+    // regions when log splitting.  More means faster but bigger mem consumption
+    int logFilesPerStep =
       conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
-    // Is append supported?
+
+    // append support = we can avoid data loss (yay)
+    // we open for append, then close to recover the correct file length
+    final boolean appendSupport = isAppend(conf);
+
+    // store corrupt logs for post-mortem analysis (empty string = discard)
+    final String corruptDir =
+      conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt");
+
+    List<Path> finishedFiles = new LinkedList<Path>();
+    List<Path> corruptFiles = new LinkedList<Path>();
+
     try {
       int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
-          concurrentLogReads)).intValue();
+          logFilesPerStep)).intValue();
       for (int step = 0; step < maxSteps; step++) {
+
+        // Step 1: read N log files into memory
         final Map<byte[], LinkedList<HLog.Entry>> logEntries =
           new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
-        // Stop at logfiles.length when it's the last step
         int endIndex = step == maxSteps - 1? logfiles.length:
-          step * concurrentLogReads + concurrentLogReads;
-        for (int i = (step * concurrentLogReads); i < endIndex; i++) {
-          // Check for possibly empty file. With appends, currently Hadoop
-          // reports a zero length even if the file has been sync'd. Revisit if
-          // HADOOP-4751 is committed.
-          long length = logfiles[i].getLen();
+          step * logFilesPerStep + logFilesPerStep;
+        for (int i = (step * logFilesPerStep); i < endIndex; i++) {
+          Path curLogFile = logfiles[i].getPath();
+
+          // make sure we get the right file length before opening for read
+          recoverLog(fs, curLogFile, appendSupport);
+
+          long length = fs.getFileStatus(curLogFile).getLen();
           if (LOG.isDebugEnabled()) {
             LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
-              ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+              ": " + curLogFile + ", length=" + length);
           }
+
           Reader in = null;
+          boolean cleanRead = false;
           int count = 0;
           try {
-            in = HLog.getReader(fs, logfiles[i].getPath(), conf);
+            in = HLog.getReader(fs, curLogFile, conf);
             try {
-              HLog.Entry entry;
+              Entry entry;
               while ((entry = in.next()) != null) {
                 byte [] regionName = entry.getKey().getRegionName();
                 LinkedList<HLog.Entry> queue = logEntries.get(regionName);
@@ -1155,20 +1233,24 @@ public class HLog implements HConstants,
                 queue.push(entry);
                 count++;
               }
-              LOG.debug("Pushed=" + count + " entries from " +
-                logfiles[i].getPath());
+              LOG.debug("Pushed=" + count + " entries from " + curLogFile);
+              cleanRead = true;
             } catch (IOException e) {
-              LOG.debug("IOE Pushed=" + count + " entries from " +
-                logfiles[i].getPath());
+              LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile);
               e = RemoteExceptionHandler.checkIOException(e);
               if (!(e instanceof EOFException)) {
-                LOG.warn("Exception processing " + logfiles[i].getPath() +
-                    " -- continuing. Possible DATA LOSS!", e);
+                String msg = "Exception processing " + curLogFile +
+                             " -- continuing. Possible DATA LOSS!";
+                if (corruptDir.length() > 0) {
+                  msg += "  Storing in hlog corruption directory.";
+                }
+                LOG.warn(msg, e);
               }
             }
           } catch (IOException e) {
             if (length <= 0) {
-              LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
+              LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+              cleanRead = true;
               continue;
             }
             throw e;
@@ -1178,8 +1260,16 @@ public class HLog implements HConstants,
                 in.close();
               }
             } catch (IOException e) {
-              LOG.warn("Close in finally threw exception -- continuing", e);
+              LOG.warn("File.close() threw exception -- continuing, "
+                     + "but marking file as corrupt.", e);
+              cleanRead = false;
             }
+            if (cleanRead) {
+              finishedFiles.add(curLogFile);
+            } else {
+              corruptFiles.add(curLogFile);
+            }
+            /* TODO FOR J-D REVIEW
             // Archive the input file now so we do not replay edits. We could
             // have gotten here because of an exception. If so, probably
             // nothing we can do about it. Replaying it, it could work but we
@@ -1187,85 +1277,89 @@ public class HLog implements HConstants,
             // could have lost some edits.
             fs.rename(logfiles[i].getPath(),
                 getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
+            */
           }
         }
+
+        // Step 2: Some regionserver log files have been read into memory.
+        //         Assign them to the appropriate region directory.
+        class ThreadWithException extends Thread {
+          ThreadWithException(String name) { super(name); }
+          public IOException exception = null;
+        }
+        List<ThreadWithException> threadList =
+          new ArrayList<ThreadWithException>(logEntries.size());
         ExecutorService threadPool =
           Executors.newFixedThreadPool(logWriterThreads);
-        for (final byte[] key : logEntries.keySet()) {
-          Thread thread = new Thread(Bytes.toStringBinary(key)) {
+        for (final byte [] region: logEntries.keySet()) {
+          ThreadWithException thread =
+              new ThreadWithException(Bytes.toStringBinary(region)) {
             @Override
             public void run() {
-              LinkedList<HLog.Entry> entries = logEntries.get(key);
+              LinkedList<HLog.Entry> entries = logEntries.get(region);
               LOG.debug("Thread got " + entries.size() + " to process");
+              if(entries.size() <= 0) {
+                LOG.warn("Got a region with no entries to process.");
+                return;
+              }
               long threadTime = System.currentTimeMillis();
               try {
                 int count = 0;
+                // get the logfile associated with this region.  2 logs often
+                // write to the same region, so persist this info across logs
+                WriterAndPath wap = logWriters.get(region);
+                if (wap == null) {
+                  // first write to this region, make new logfile
+                  assert entries.size() > 0;
+                  Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
+                      .getTableDir(rootDir,
+                                   entries.getFirst().getKey().getTablename()),
+                      HRegionInfo.encodeRegionName(region)),
+                      HREGION_OLDLOGFILE_NAME);
+
+                  // If splitLog() was running when the user restarted his
+                  // cluster, then we could already have a 'logfile'.
+                  // Since we don't delete logs until everything is written to
+                  // their respective regions, we can safely remove this tmp.
+                  if (fs.exists(logfile)) {
+                    LOG.warn("Deleting old hlog file: " + logfile);
+                    // TODO: Archive? 
+                    fs.delete(logfile, true);
+                  }
+
+                  // associate an OutputStream with this logfile
+                  Writer w = createWriter(fs, logfile, conf);
+                  wap = new WriterAndPath(logfile, w);
+                  logWriters.put(region, wap);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Creating new hlog file writer for path "
+                        + logfile + " and region " + Bytes.toStringBinary(region));
+                  }
+                }
+
                 // Items were added to the linkedlist oldest first. Pull them
                 // out in that order.
-                for (ListIterator<HLog.Entry> i =
-                  entries.listIterator(entries.size());
+                for (ListIterator<HLog.Entry> i = entries.listIterator(entries.size());
                     i.hasPrevious();) {
-                  HLog.Entry logEntry = i.previous();
-                  WriterAndPath wap = logWriters.get(key);
-                  if (wap == null) {
-                    Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
-                        .getTableDir(rootDir, logEntry.getKey().getTablename()),
-                        HRegionInfo.encodeRegionName(key)),
-                        HREGION_OLDLOGFILE_NAME);
-                    Path oldlogfile = null;
-                    Reader old = null;
-                    if (fs.exists(logfile)) {
-                      FileStatus stat = fs.getFileStatus(logfile);
-                      if (stat.getLen() <= 0) {
-                        LOG.warn("Old hlog file " + logfile + " is zero " +
-                          "length. Deleting existing file");
-                        fs.delete(logfile, false);
-                      } else {
-                        LOG.warn("Old hlog file " + logfile + " already " +
-                          "exists. Copying existing file to new file");
-                        oldlogfile = new Path(logfile.toString() + ".old");
-                        fs.rename(logfile, oldlogfile);
-                        old = getReader(fs, oldlogfile, conf);
-                      }
-                    }
-                    Writer w = createWriter(fs, logfile, conf);
-                    wap = new WriterAndPath(logfile, w);
-                    logWriters.put(key, wap);
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Creating new hlog file writer for path "
-                          + logfile + " and region " + Bytes.toStringBinary(key));
-                    }
-
-                    if (old != null) {
-                      // Copy from existing log file
-                      HLog.Entry entry;
-                      for (; (entry = old.next()) != null; count++) {
-                        if (LOG.isDebugEnabled() && count > 0
-                            && count % 10000 == 0) {
-                          LOG.debug("Copied " + count + " edits");
-                        }
-                        w.append(entry);
-                      }
-                      old.close();
-                      fs.delete(oldlogfile, true);
-                    }
-                  }
-                  wap.w.append(logEntry);
+                  wap.w.append(i.previous());
                   count++;
                 }
+
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Applied " + count + " total edits to "
-                      + Bytes.toStringBinary(key) + " in "
+                      + Bytes.toStringBinary(region) + " in "
                       + (System.currentTimeMillis() - threadTime) + "ms");
                 }
               } catch (IOException e) {
                 e = RemoteExceptionHandler.checkIOException(e);
-                LOG.warn("Got while writing region " + Bytes.toStringBinary(key)
-                    + " log " + e);
+                LOG.warn("Got while writing region "
+                    + Bytes.toStringBinary(region) + " log " + e);
                 e.printStackTrace();
+                exception = e;
               }
             }
           };
+          threadList.add(thread);
           threadPool.execute(thread);
         }
         threadPool.shutdown();
@@ -1274,9 +1368,19 @@ public class HLog implements HConstants,
           for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
             LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
           }
-        }catch(InterruptedException ex) {
-          LOG.warn("Hlog writers were interrupted, possible data loss!");
+        } catch (InterruptedException ex) {
+          LOG.warn("Hlog writers were interrupted during splitLog().  "
+              +"Retaining log files to avoid data loss.");
+          throw new IOException(ex.getMessage(), ex.getCause());
+        }
+        // throw an exception if one of the threads reported one
+        for (ThreadWithException t : threadList) {
+          if (t.exception != null) {
+            throw t.exception;
+          }
         }
+
+        // End of for loop. Rinse and repeat
       }
     } finally {
       splits = new ArrayList<Path>(logWriters.size());
@@ -1286,9 +1390,79 @@ public class HLog implements HConstants,
         splits.add(wap.p);
       }
     }
+
+    // Step 3: All writes succeeded!  Get rid of the now-unnecessary logs
+    for(Path p : finishedFiles) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Successfully split Hlog file.  Deleting " + p);
+      }
+      try {
+        if (!fs.delete(p, true) && LOG.isDebugEnabled()) {
+          LOG.debug("Delete of split Hlog (" + p + ") failed.");
+        }
+      } catch (IOException e) {
+        // don't throw error here. worst case = double-read
+        LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e);
+      }
+    }
+    for (Path p : corruptFiles) {
+      if (corruptDir.length() > 0) {
+        // store any corrupt logs for later analysis
+        Path cp = new Path(conf.get(HBASE_DIR), corruptDir);
+        if(!fs.exists(cp)) {
+          fs.mkdirs(cp);
+        }
+        Path newp = new Path(cp, p.getName());
+        if (!fs.exists(newp)) {
+          if (!fs.rename(p, newp)) {
+            LOG.warn("Rename of " + p + " to " + newp + " failed.");
+          } else {
+            LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp);
+          }
+        } else {
+          LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp +
+                   ".  Ignoring");
+        }
+      } else {
+        // data loss is less important than disk space, delete
+        try {
+          if (!fs.delete(p, true) ) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Delete of split Hlog " + p + " failed.");
+            }
+          } else {
+            LOG.warn("Corrupt Hlog (" + p + ") deleted!");
+          }
+        } catch (IOException e) {
+          LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e);
+        }
+      }
+    }
+
     return splits;
   }
 
+  /*
+   * @param conf
+   * @return True if append enabled and we have the syncFs in our path.
+   */
+  static boolean isAppend(final Configuration conf) {
+    boolean append = conf.getBoolean("dfs.support.append", false);
+    if (append) {
+      try {
+        // TODO: The implementation that comes back when we do a createWriter
+        // may not be using SequenceFile so the below is not a definitive test.
+        // Will do for now (hdfs-200).
+        SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+        append = true;
+      } catch (SecurityException e) {
+      } catch (NoSuchMethodException e) {
+        append = false;
+      }
+    }
+    return append;
+  }
+
   /**
    * Utility class that lets us keep track of the edit with it's key
    * Only used when splitting logs
@@ -1345,6 +1519,64 @@ public class HLog implements HConstants,
     }
   }
 
+  /*
+   * Recover log.
+   * Try and open log in append mode.
+   * Doing this, we get a hold of the file that crashed writer
+   * was writing to.  Once we have it, close it.  This will
+   * allow subsequent reader to see up to last sync.
+   * @param fs
+   * @param p
+   * @param append
+   */
+  public static void recoverLog(final FileSystem fs, final Path p,
+      final boolean append) throws IOException {
+    if (!append) {
+      return;
+    }
+
+    // lease recovery not needed for local file system case.
+    // currently, local file system doesn't implement append either.
+    if (!(fs instanceof DistributedFileSystem)) {
+      return;
+    }
+
+    LOG.debug("Recovering DFS lease for path " + p);
+    long startWaiting = System.currentTimeMillis();
+
+    // Trying recovery
+    boolean recovered = false;
+    while (!recovered) {
+      try {
+        FSDataOutputStream out = fs.append(p);
+        out.close();
+        recovered = true;
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        if (e instanceof AlreadyBeingCreatedException) {
+          // We expect that we'll get this message while the lease is still
+          // within its soft limit, but if we get it past that, it means
+          // that the RS is holding onto the file even though it lost its
+          // znode. We could potentially abort after some time here.
+          long waitedFor = System.currentTimeMillis() - startWaiting;
+
+          if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
+            LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p
+            + ":" + e.getMessage());
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            // ignore it and try again
+          }
+        } else {
+          throw new IOException("Failed to open " + p + " for append", e);
+        }
+      }
+    }
+    LOG.info("Past out lease recovery");
+  }
+
   /**
    * Construct the HLog directory name
    *

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=944063&r1=944062&r2=944063&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Thu May 13 23:59:05 2010
@@ -21,29 +21,42 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.DefaultCodec;
 
+/**
+ * Implementation of {@link HLog.Writer} that delegates to
+ * {@link SequenceFile.Writer}.
+ */
 public class SequenceFileLogWriter implements HLog.Writer {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  // The sequence file we delegate to.
+  private SequenceFile.Writer writer;
+  // The dfsclient out stream gotten made accessible or null if not available.
+  private OutputStream dfsClient_out;
+  // The syncFs method from hdfs-200 or null if not available.
+  private Method syncFs;
 
-  SequenceFile.Writer writer;
-  FSDataOutputStream writer_out;
-
-  public SequenceFileLogWriter() { }
+  public SequenceFileLogWriter() {
+    super();
+  }
 
   @Override
   public void init(FileSystem fs, Path path, Configuration conf)
       throws IOException {
-    writer = SequenceFile.createWriter(fs, conf, path,
+    // Create a SF.Writer instance.
+    this.writer = SequenceFile.createWriter(fs, conf, path,
       HLog.getKeyClass(conf), WALEdit.class,
       fs.getConf().getInt("io.file.buffer.size", 4096),
       (short) conf.getInt("hbase.regionserver.hlog.replication",
@@ -58,19 +71,39 @@ public class SequenceFileLogWriter imple
     // Get at the private FSDataOutputStream inside in SequenceFile so we can
     // call sync on it.  Make it accessible.  Stash it aside for call up in
     // the sync method.
-    final Field fields[] = writer.getClass().getDeclaredFields();
+    final Field fields [] = this.writer.getClass().getDeclaredFields();
     final String fieldName = "out";
     for (int i = 0; i < fields.length; ++i) {
       if (fieldName.equals(fields[i].getName())) {
         try {
+          // Make the 'out' field up in SF.Writer accessible.
           fields[i].setAccessible(true);
-          this.writer_out = (FSDataOutputStream)fields[i].get(writer);
+          FSDataOutputStream out =
+            (FSDataOutputStream)fields[i].get(this.writer);
+          this.dfsClient_out = out.getWrappedStream();
           break;
         } catch (IllegalAccessException ex) {
           throw new IOException("Accessing " + fieldName, ex);
         }
       }
     }
+
+    // Now do dirty work to see if syncFs is available.
+    // Test if syncfs is available.
+    Method m = null;
+    if (conf.getBoolean("dfs.support.append", false)) {
+      try {
+        // function pointer to writer.syncFs()
+        m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
+      } catch (SecurityException e) {
+        throw new IOException("Failed test for syncfs", e);
+      } catch (NoSuchMethodException e) {
+        // Not available
+      }
+    }
+    this.syncFs = m;
+    LOG.info((this.syncFs != null)?
+      "Using syncFs -- HDFS-200": "syncFs -- HDFS-200 -- not available");
   }
 
   @Override
@@ -86,12 +119,25 @@ public class SequenceFileLogWriter imple
   @Override
   public void sync() throws IOException {
     this.writer.sync();
-
-    this.writer.syncFs();
+    if (this.syncFs != null) {
+      try {
+       this.syncFs.invoke(this.writer, HLog.NO_ARGS);
+      } catch (Exception e) {
+        throw new IOException("Reflection", e);
+      }
+    }
   }
 
   @Override
   public long getLength() throws IOException {
     return this.writer.getLength();
   }
-}
+
+  /**
+   * @return The dfsclient out stream up inside SF.Writer made accessible, or
+   * null if not available.
+   */
+  public OutputStream getDFSCOutputStream() {
+    return this.dfsClient_out;
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=944063&r1=944062&r2=944063&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Thu May 13 23:59:05 2010
@@ -28,15 +28,14 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -129,6 +128,7 @@ public class MiniHBaseCluster implements
    */
   public static class MiniHBaseClusterRegionServer extends HRegionServer {
     private static int index = 0;
+    private Thread shutdownThread = null;
 
     public MiniHBaseClusterRegionServer(Configuration conf)
         throws IOException {
@@ -159,11 +159,20 @@ public class MiniHBaseCluster implements
     @Override
     protected void init(MapWritable c) throws IOException {
       super.init(c);
-      // Change shutdown hook to only shutdown the FileSystem added above by
-      // {@link #getFileSystem(HBaseConfiguration)
-      if (getFileSystem() instanceof DistributedFileSystem) {
-        Thread t = new SingleFileSystemShutdownThread(getFileSystem());
-        Runtime.getRuntime().addShutdownHook(t);
+      // Run this thread to shutdown our filesystem on way out.
+      this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
+    }
+
+    @Override
+    public void run() {
+      try {
+        super.run();
+      } finally {
+        // Run this on the way out.
+        if (this.shutdownThread != null) {
+          this.shutdownThread.start();
+          Threads.shutdown(this.shutdownThread, 30000);
+        }
       }
     }
  

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=944063&r1=944062&r2=944063&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu May 13 23:59:05 2010
@@ -19,6 +19,11 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -27,15 +32,10 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /** JUnit test case for HLog */
 public class TestHLog extends HBaseTestCase implements HConstants {
   private Path dir;
@@ -322,4 +322,70 @@ public class TestHLog extends HBaseTestC
       }
     }
   }
-}
+
+  /**
+   * @throws IOException
+   */
+  public void testAppend() throws IOException {
+    final int COL_COUNT = 10;
+    final byte [] tableName = Bytes.toBytes("tablename");
+    final byte [] row = Bytes.toBytes("row");
+    this.conf.setBoolean("dfs.support.append", true);
+    Reader reader = null;
+    HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
+    try {
+      // Write columns named 1, 2, 3, etc. and then values of single byte
+      // 1, 2, 3...
+      long timestamp = System.currentTimeMillis();
+      WALEdit cols = new WALEdit();
+      for (int i = 0; i < COL_COUNT; i++) {
+        cols.add(new KeyValue(row, Bytes.toBytes("column"),
+          Bytes.toBytes(Integer.toString(i)),
+          timestamp, new byte[] { (byte)(i + '0') }));
+      }
+      HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
+          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      log.append(hri, tableName, cols, System.currentTimeMillis());
+      long logSeqId = log.startCacheFlush();
+      log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
+      log.close();
+      Path filename = log.computeFilename(log.getFilenum());
+      log = null;
+      // Now open a reader on the log and assert append worked.
+      reader = HLog.getReader(fs, filename, conf);
+      HLog.Entry entry = reader.next();
+      assertEquals(COL_COUNT, entry.getEdit().size());
+      int idx = 0;
+      for (KeyValue val : entry.getEdit().getKeyValues()) {
+        assertTrue(Bytes.equals(hri.getRegionName(),
+          entry.getKey().getRegionName()));
+        assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+        assertTrue(Bytes.equals(row, val.getRow()));
+        assertEquals((byte)(idx + '0'), val.getValue()[0]);
+        System.out.println(entry.getKey() + " " + val);
+        idx++;
+      }
+
+      // Get next row... the meta flushed row.
+      entry = reader.next();
+      assertEquals(1, entry.getEdit().size());
+      for (KeyValue val : entry.getEdit().getKeyValues()) {
+        assertTrue(Bytes.equals(hri.getRegionName(),
+          entry.getKey().getRegionName()));
+        assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
+        assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
+        assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
+        assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
+          val.getValue()));
+        System.out.println(entry.getKey() + " " + val);
+      }
+    } finally {
+      if (log != null) {
+        log.closeAndDelete();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}
\ No newline at end of file