You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:48 UTC

[09/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index cd2b752..a0f571b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -89,10 +89,12 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -236,7 +238,7 @@ public class TestDistributedLogSplitting {
       }
       if (foundRs) break;
     }
-    final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
+    final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs
         .getServerName().toString()));
 
     LOG.info("#regions = " + regions.size());
@@ -248,7 +250,8 @@ public class TestDistributedLogSplitting {
         it.remove();
       }
     }
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     slm.splitLogDistributed(logDir);
 
@@ -257,12 +260,13 @@ public class TestDistributedLogSplitting {
 
       Path tdir = FSUtils.getTableDir(rootdir, table);
       Path editsdir =
-        HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
       LOG.debug("checking edits dir " + editsdir);
       FileStatus[] files = fs.listStatus(editsdir);
-      assertTrue(files.length > 1);
+      assertTrue("edits dir should have more than a single file in it. instead has " + files.length,
+          files.length > 1);
       for (int i = 0; i < files.length; i++) {
-        int c = countHLog(files[i].getPath(), fs, conf);
+        int c = countWAL(files[i].getPath(), fs, conf);
         count += c;
       }
       LOG.info(count + " edits in " + files.length + " recovered edits files.");
@@ -291,7 +295,7 @@ public class TestDistributedLogSplitting {
 
     HRegionServer hrs = findRSToKill(false, "table");
     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     // wait for abort completes
     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
@@ -389,7 +393,7 @@ public class TestDistributedLogSplitting {
 
     HRegionServer hrs = findRSToKill(true, "table");
     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
     ht.close();
@@ -457,7 +461,7 @@ public class TestDistributedLogSplitting {
 
     HRegionServer hrs = findRSToKill(false, "table");
     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     // abort master
     abortMaster(cluster);
@@ -514,7 +518,7 @@ public class TestDistributedLogSplitting {
 
     HRegionServer hrs = findRSToKill(false, "table");
     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     // abort master
     abortMaster(cluster);
@@ -577,7 +581,7 @@ public class TestDistributedLogSplitting {
     HRegionServer hrs1 = findRSToKill(false, "table");
     regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices());
 
-    makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100);
 
     // abort RS1
     LOG.info("Aborting region server: " + hrs1.getServerName());
@@ -791,8 +795,8 @@ public class TestDistributedLogSplitting {
         it.remove();
       }
     }
-    makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
-    makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
+    makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
+    makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
 
     LOG.info("Disabling table\n");
     TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable"));
@@ -836,13 +840,13 @@ public class TestDistributedLogSplitting {
     Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
     for (HRegionInfo hri : regions) {
       Path editsdir =
-        HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
       LOG.debug("checking edits dir " + editsdir);
       if(!fs.exists(editsdir)) continue;
       FileStatus[] files = fs.listStatus(editsdir);
       if(files != null) {
         for(FileStatus file : files) {
-          int c = countHLog(file.getPath(), fs, conf);
+          int c = countWAL(file.getPath(), fs, conf);
           count += c;
           LOG.info(c + " edits in " + file.getPath());
         }
@@ -857,7 +861,7 @@ public class TestDistributedLogSplitting {
     // clean up
     for (HRegionInfo hri : regions) {
       Path editsdir =
-        HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
+        WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
       fs.delete(editsdir, true);
     }
     disablingHT.close();
@@ -963,12 +967,12 @@ public class TestDistributedLogSplitting {
     HRegionServer hrs = findRSToKill(false, "table");
     Path rootdir = FSUtils.getRootDir(conf);
     final Path logDir = new Path(rootdir,
-        HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
+        DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
 
     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
         "table", "family", 40);
 
-    makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()),
+    makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()),
       "table", "family", NUM_LOG_LINES, 100);
 
     new Thread() {
@@ -1241,15 +1245,17 @@ public class TestDistributedLogSplitting {
     long timeStamp = System.currentTimeMillis();
     HTableDescriptor htd = new HTableDescriptor();
     htd.addFamily(new HColumnDescriptor(family));
+    final WAL wal = hrs.getWAL(curRegionInfo);
     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
       WALEdit e = new WALEdit();
       value++;
       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-      hrs.getWAL().append(curRegionInfo, tableName, e,
-        System.currentTimeMillis(), htd, sequenceId);
+      wal.append(htd, curRegionInfo,
+          new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
+          e, sequenceId, true, null);
     }
-    hrs.getWAL().sync();
-    hrs.getWAL().close();
+    wal.sync();
+    wal.shutdown();
 
     // wait for abort completes
     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
@@ -1332,15 +1338,16 @@ public class TestDistributedLogSplitting {
     long timeStamp = System.currentTimeMillis();
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(new HColumnDescriptor(family));
+    final WAL wal = hrs.getWAL(curRegionInfo);
     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
       WALEdit e = new WALEdit();
       value++;
       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-      hrs.getWAL().append(curRegionInfo, tableName, e,
-        System.currentTimeMillis(), htd, sequenceId);
+      wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
+          tableName, System.currentTimeMillis()), e, sequenceId, true, null);
     }
-    hrs.getWAL().sync();
-    hrs.getWAL().close();
+    wal.sync();
+    wal.shutdown();
 
     // wait for abort completes
     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
@@ -1380,27 +1387,24 @@ public class TestDistributedLogSplitting {
     FileSystem fs = master.getMasterFileSystem().getFileSystem();
     Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table"));
     List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
-    HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
+    WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
     // current SeqId file has seqid=1001
-    HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
+    WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
     // current SeqId file has seqid=2001
-    assertEquals(3001, HLogUtil.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 3L, 1000L));
-
-    Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regionDirs.get(0));
+    assertEquals(3001, WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
+    
+    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
       @Override
       public boolean accept(Path p) {
-        if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) {
-          return true;
-        }
-        return false;
+        return WALSplitter.isSequenceIdFile(p);
       }
     });
     // only one seqid file should exist
     assertEquals(1, files.length);
 
     // verify all seqId files aren't treated as recovered.edits files
-    NavigableSet<Path> recoveredEdits = HLogUtil.getSplitEditFilesSorted(fs, regionDirs.get(0));
+    NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0));
     assertEquals(0, recoveredEdits.size());
 
     ht.close();
@@ -1466,13 +1470,13 @@ public class TestDistributedLogSplitting {
     }
   }
 
-  public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
+  public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
       int num_edits, int edit_size) throws IOException {
-    makeHLog(log, regions, tname, fname, num_edits, edit_size, true);
+    makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true);
   }
 
-  public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
-      int num_edits, int edit_size, boolean closeLog) throws IOException {
+  public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
+      int num_edits, int edit_size, boolean cleanShutdown) throws IOException {
     TableName fullTName = TableName.valueOf(tname);
     // remove root and meta region
     regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
@@ -1504,10 +1508,13 @@ public class TestDistributedLogSplitting {
     }
     int n = hris.size();
     int[] counts = new int[n];
+    // sync every ~30k to line up with desired wal rolls
+    final int syncEvery = 30 * 1024 / edit_size;
     if (n > 0) {
       for (int i = 0; i < num_edits; i += 1) {
         WALEdit e = new WALEdit();
         HRegionInfo curRegionInfo = hris.get(i % n);
+        final WAL log = hrs.getWAL(curRegionInfo);
         byte[] startRow = curRegionInfo.getStartKey();
         if (startRow == null || startRow.length == 0) {
           startRow = new byte[] { 0, 0, 0, 0, 1 };
@@ -1518,13 +1525,25 @@ public class TestDistributedLogSplitting {
                                              // key
         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
-        log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
+        log.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
+            System.currentTimeMillis()), e, sequenceId, true, null);
+        if (0 == i % syncEvery) {
+          log.sync();
+        }
         counts[i % n] += 1;
       }
     }
-    log.sync();
-    if(closeLog) {
-      log.close();
+    // done as two passes because the regions might share logs. shutdown is idempotent, but sync
+    // will cause errors if done after.
+    for (HRegionInfo info : hris) {
+      final WAL log = hrs.getWAL(info);
+      log.sync();
+    }
+    if (cleanShutdown) {
+      for (HRegionInfo info : hris) {
+        final WAL log = hrs.getWAL(info);
+        log.shutdown();
+      }
     }
     for (int i = 0; i < n; i++) {
       LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
@@ -1532,14 +1551,23 @@ public class TestDistributedLogSplitting {
     return;
   }
 
-  private int countHLog(Path log, FileSystem fs, Configuration conf)
+  private int countWAL(Path log, FileSystem fs, Configuration conf)
   throws IOException {
     int count = 0;
-    HLog.Reader in = HLogFactory.createReader(fs, log, conf);
-    HLog.Entry e;
-    while ((e = in.next()) != null) {
-      if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
-        count++;
+    WAL.Reader in = WALFactory.createReader(fs, log, conf);
+    try {
+      WAL.Entry e;
+      while ((e = in.next()) != null) {
+        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
+          count++;
+        }
+      }
+    } finally {
+      try {
+        in.close();
+      } catch (IOException exception) {
+        LOG.warn("Problem closing wal: " + exception.getMessage());
+        LOG.debug("exception details.", exception);
       }
     }
     return count;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index 8a90f48..84a3e07 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -924,7 +924,7 @@ public class TestMasterFailover {
       final HTableDescriptor htd)
   throws IOException {
     HRegion r = HRegion.createHRegion(hri, rootdir, c, htd);
-    // The above call to create a region will create an hlog file.  Each
+    // The above call to create a region will create an wal file.  Each
     // log file create will also create a running thread to do syncing.  We need
     // to close out this log else we will have a running thread trying to sync
     // the file system continuously which is ugly when dfs is taken away at the

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java
index e0b3782..5de76fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
@@ -98,7 +97,7 @@ public class TestSnapshotFileCache {
         "test-snapshot-file-cache-refresh", new SnapshotFileCache.SnapshotFileInspector() {
             public Collection<String> filesUnderSnapshot(final Path snapshotDir)
                 throws IOException {
-              return SnapshotReferenceUtil.getHLogNames(fs, snapshotDir);
+              return SnapshotReferenceUtil.getWALNames(fs, snapshotDir);
             }
         });
 
@@ -110,7 +109,7 @@ public class TestSnapshotFileCache {
     fs.createNewFile(file1);
 
     // and another file in the logs directory
-    Path logs = getSnapshotHLogsDir(snapshot, "server");
+    Path logs = SnapshotReferenceUtil.getLogsDir(snapshot, "server");
     Path log = new Path(logs, "me.hbase.com%2C58939%2C1350424310315.1350424315552");
     fs.createNewFile(log);
 
@@ -122,16 +121,6 @@ public class TestSnapshotFileCache {
     assertTrue("Cache didn't find:" + log, cache.contains(log.getName()));
   }
 
-  /**
-   * Get the log directory for a specific snapshot
-   * @param snapshotDir directory where the specific snapshot will be store
-   * @param serverName name of the parent regionserver for the log files
-   * @return path to the log home directory for the archive files.
-   */
-  public static Path getSnapshotHLogsDir(Path snapshotDir, String serverName) {
-    return new Path(snapshotDir, HLogUtil.getHLogDirectoryName(serverName));
-  }
-
   @Test
   public void testReloadModifiedDirectory() throws IOException {
     // don't refresh the cache unless we tell it to
@@ -169,7 +158,7 @@ public class TestSnapshotFileCache {
   class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
     public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
       Collection<String> files =  new HashSet<String>();
-      files.addAll(SnapshotReferenceUtil.getHLogNames(fs, snapshotDir));
+      files.addAll(SnapshotReferenceUtil.getWALNames(fs, snapshotDir));
       files.addAll(SnapshotReferenceUtil.getHFileNames(UTIL.getConfiguration(), fs, snapshotDir));
       return files;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index 1f48d5f..cd22d86 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@ -202,12 +202,12 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
   }
 
   @Override
-  public long getNumHLogFiles() {
+  public long getNumWALFiles() {
     return 10;
   }
 
   @Override
-  public long getHLogFileSize() {
+  public long getWALFileSize() {
     return 1024000;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 257579a..13e2f69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.Before;
@@ -606,7 +606,7 @@ public class TestAtomicOperation {
 
   public static class MockHRegion extends HRegion {
 
-    public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
+    public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
         final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
       super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 8699954..2d9c6c3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -50,9 +50,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -125,7 +126,7 @@ public class TestCacheOnWriteInSchema {
   private final String testDescription;
   private HRegion region;
   private HStore store;
-  private HLog hlog;
+  private WALFactory walFactory;
   private FileSystem fs;
 
   public TestCacheOnWriteInSchema(CacheOnWriteType cowType) {
@@ -164,15 +165,16 @@ public class TestCacheOnWriteInSchema {
     htd.addFamily(hcd);
 
     // Create a store based on the schema
-    Path basedir = new Path(DIR);
-    String logName = "logs";
-    Path logdir = new Path(DIR, logName);
+    final String id = TestCacheOnWriteInSchema.class.getName();
+    final Path logdir = new Path(FSUtils.getRootDir(conf),
+        DefaultWALProvider.getWALDirectoryName(id));
     fs.delete(logdir, true);
 
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-    hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
+    walFactory = new WALFactory(conf, null, id);
 
-    region = TEST_UTIL.createLocalHRegion(info, htd, hlog);
+    region = TEST_UTIL.createLocalHRegion(info, htd,
+        walFactory.getWAL(info.getEncodedNameAsBytes()));
     store = new HStore(region, hcd, conf);
   }
 
@@ -186,7 +188,7 @@ public class TestCacheOnWriteInSchema {
       ex = e;
     }
     try {
-      hlog.closeAndDelete();
+      walFactory.close();
     } catch (IOException e) {
       LOG.warn("Caught Exception", e);
       ex = e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 6e70b3e..f9d6940 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -117,9 +117,9 @@ public class TestCompaction {
 
   @After
   public void tearDown() throws Exception {
-    HLog hlog = r.getLog();
+    WAL wal = r.getWAL();
     this.r.close();
-    hlog.closeAndDelete();
+    wal.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index 9a19524..5480e92 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.After;
@@ -64,7 +64,7 @@ public class TestDefaultCompactSelection extends TestCase {
   protected static final long minSize = 10;
   protected static final long maxSize = 2100;
 
-  private HLog hlog;
+  private WALFactory wals;
   private HRegion region;
 
   @Override
@@ -79,9 +79,9 @@ public class TestDefaultCompactSelection extends TestCase {
     this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F);
 
     //Setting up a Store
+    final String id = TestDefaultCompactSelection.class.getName();
     Path basedir = new Path(DIR);
-    String logName = "logs";
-    Path logdir = new Path(DIR, logName);
+    final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(id));
     HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
     FileSystem fs = FileSystem.get(conf);
 
@@ -91,11 +91,14 @@ public class TestDefaultCompactSelection extends TestCase {
     htd.addFamily(hcd);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
 
-    hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
+    final Configuration walConf = new Configuration(conf);
+    FSUtils.setRootDir(walConf, basedir);
+    wals = new WALFactory(walConf, null, id);
     region = HRegion.createHRegion(info, basedir, conf, htd);
     HRegion.closeHRegion(region);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
-    region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
+    region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf, info, htd,
+        null);
 
     store = new HStore(region, hcd, conf);
 
@@ -113,7 +116,7 @@ public class TestDefaultCompactSelection extends TestCase {
       ex = e;
     }
     try {
-      hlog.closeAndDelete();
+      wals.close();
     } catch (IOException e) {
       LOG.warn("Caught Exception", e);
       ex = e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
index cd97f6b..b2eaa87 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetClosestAtOrBefore.java
@@ -139,7 +139,7 @@ public class TestGetClosestAtOrBefore extends HBaseTestCase {
         } catch (Exception e) {
           e.printStackTrace();
         }
-        mr.getLog().closeAndDelete();
+        mr.getWAL().close();
       }
     }
   }
@@ -289,7 +289,7 @@ public class TestGetClosestAtOrBefore extends HBaseTestCase {
         } catch (Exception e) {
           e.printStackTrace();
         }
-        region.getLog().closeAndDelete();
+        region.getWAL().close();
       }
     }
   }
@@ -344,7 +344,7 @@ public class TestGetClosestAtOrBefore extends HBaseTestCase {
         } catch (Exception e) {
           e.printStackTrace();
         }
-        region.getLog().closeAndDelete();
+        region.getWAL().close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 83aaec1..d8ee735 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -125,13 +125,17 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
-import org.apache.hadoop.hbase.regionserver.wal.FaultyHLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.FaultyFSLog;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -253,9 +257,9 @@ public class TestHRegion {
    */
   @Test (timeout=60000)
   public void testMemstoreSnapshotSize() throws IOException {
-    class MyFaultyHLog extends FaultyHLog {
+    class MyFaultyFSLog extends FaultyFSLog {
       StoreFlushContext storeFlushCtx;
-      public MyFaultyHLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
+      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
           throws IOException {
         super(fs, rootDir, logName, conf);
       }
@@ -273,7 +277,7 @@ public class TestHRegion {
 
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
-    MyFaultyHLog faultyLog = new MyFaultyHLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
+    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
       CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
 
@@ -284,7 +288,7 @@ public class TestHRegion {
 
     Put put = new Put(value);
     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
-    faultyLog.setFailureType(FaultyHLog.FailureType.SYNC);
+    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
 
     boolean threwIOE = false;
     try {
@@ -511,12 +515,13 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, null, method);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -524,13 +529,13 @@ public class TestHRegion {
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         fs.create(recoveredEdits);
-        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
 
         long time = System.nanoTime();
         WALEdit edit = new WALEdit();
         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
             .toBytes(i)));
-        writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
+        writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
             HConstants.DEFAULT_CLUSTER_ID), edit));
 
         writer.close();
@@ -553,6 +558,7 @@ public class TestHRegion {
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
+      wals.close();
     }
   }
 
@@ -562,12 +568,13 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, null, method);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -575,13 +582,13 @@ public class TestHRegion {
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         fs.create(recoveredEdits);
-        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
 
         long time = System.nanoTime();
         WALEdit edit = new WALEdit();
         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
             .toBytes(i)));
-        writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
+        writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
             HConstants.DEFAULT_CLUSTER_ID), edit));
 
         writer.close();
@@ -609,6 +616,7 @@ public class TestHRegion {
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
+      wals.close();
     }
   }
 
@@ -620,7 +628,7 @@ public class TestHRegion {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
       for (int i = 1000; i < 1050; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         FSDataOutputStream dos = fs.create(recoveredEdits);
@@ -650,6 +658,7 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, null, method);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -658,7 +667,7 @@ public class TestHRegion {
       assertEquals(0, region.getStoreFileList(
         region.getStores().keySet().toArray(new byte[0][])).size());
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -666,7 +675,7 @@ public class TestHRegion {
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
         fs.create(recoveredEdits);
-        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
 
         long time = System.nanoTime();
         WALEdit edit = null;
@@ -684,7 +693,7 @@ public class TestHRegion {
           edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
             .toBytes(i)));
         }
-        writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
+        writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
             HConstants.DEFAULT_CLUSTER_ID), edit));
         writer.close();
       }
@@ -705,7 +714,9 @@ public class TestHRegion {
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
-    }  }
+      wals.close();
+    }
+  }
 
   @Test
   public void testRecoveredEditsReplayCompaction() throws Exception {
@@ -713,6 +724,7 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     this.region = initHRegion(tableName, method, CONF, family);
+    final WALFactory wals = new WALFactory(CONF, null, method);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -757,18 +769,18 @@ public class TestHRegion {
           .getRegionInfo(), family, storeFiles, Lists.newArrayList(newFile), region
           .getRegionFileSystem().getStoreDir(Bytes.toString(family)));
 
-      HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
           this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1));
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
 
       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
       fs.create(recoveredEdits);
-      HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
 
       long time = System.nanoTime();
 
-      writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
+      writer.append(new WAL.Entry(new HLogKey(regionName, tableName, 10, time,
           HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
           compactionDescriptor)));
       writer.close();
@@ -797,6 +809,7 @@ public class TestHRegion {
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
+      wals.close();
     }
   }
 
@@ -807,11 +820,13 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
-    HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, "logs",
-      TEST_UTIL.getConfiguration());
+    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
+    FSUtils.setRootDir(walConf, logDir);
+    final WALFactory wals = new WALFactory(walConf, null, method);
+    final WAL wal = wals.getWAL(tableName.getName());
 
     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
-      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
+      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
     try {
       Path regiondir = region.getRegionFileSystem().getRegionDir();
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
@@ -835,59 +850,69 @@ public class TestHRegion {
       }
 
       // now verify that the flush markers are written
-      hlog.close();
-      HLog.Reader reader = HLogFactory.createReader(fs,
-        fs.listStatus(new Path(logDir, "logs"))[0].getPath(),
+      wal.shutdown();
+      WAL.Reader reader = wals.createReader(fs, DefaultWALProvider.getCurrentFileName(wal),
         TEST_UTIL.getConfiguration());
-
-      List<HLog.Entry> flushDescriptors = new ArrayList<HLog.Entry>();
-      long lastFlushSeqId = -1;
-      while (true) {
-        HLog.Entry entry = reader.next();
-        if (entry == null) {
-          break;
-        }
-        Cell cell = entry.getEdit().getCells().get(0);
-        if (WALEdit.isMetaEditFamily(cell)) {
-          FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
-          assertNotNull(flushDesc);
-          assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
-          if (flushDesc.getAction() == FlushAction.START_FLUSH) {
-            assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
-          } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
-            assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
-          }
-          lastFlushSeqId = flushDesc.getFlushSequenceNumber();
-          assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
-          assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
-          StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
-          assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
-          assertEquals("family", storeFlushDesc.getStoreHomeDir());
-          if (flushDesc.getAction() == FlushAction.START_FLUSH) {
-            assertEquals(0, storeFlushDesc.getFlushOutputCount());
-          } else {
-            assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
-            assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
+      try {
+        List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>();
+        long lastFlushSeqId = -1;
+        while (true) {
+          WAL.Entry entry = reader.next();
+          if (entry == null) {
+            break;
           }
+          Cell cell = entry.getEdit().getCells().get(0);
+          if (WALEdit.isMetaEditFamily(cell)) {
+            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
+            assertNotNull(flushDesc);
+            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
+            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
+            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
+              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
+            }
+            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
+            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
+            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
+            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
+            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
+            assertEquals("family", storeFlushDesc.getStoreHomeDir());
+            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
+              assertEquals(0, storeFlushDesc.getFlushOutputCount());
+            } else {
+              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
+              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
+            }
 
-          flushDescriptors.add(entry);
+            flushDescriptors.add(entry);
+          }
         }
-      }
 
-      assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
+        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
 
-      // now write those markers to the recovered edits again.
+        // now write those markers to the recovered edits again.
 
-      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+        Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
 
-      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
-      fs.create(recoveredEdits);
-      HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
+        fs.create(recoveredEdits);
+        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
 
-      for (HLog.Entry entry : flushDescriptors) {
-        writer.append(entry);
+        for (WAL.Entry entry : flushDescriptors) {
+          writer.append(entry);
+        }
+        writer.close();
+      } finally {
+        if (null != reader) {
+          try {
+            reader.close();
+          } catch (IOException exception) {
+            LOG.warn("Problem closing wal: " + exception.getMessage());
+            LOG.debug("exception details", exception);
+          }
+        }
       }
-      writer.close();
+
 
       // close the region now, and reopen again
       region.close();
@@ -903,6 +928,7 @@ public class TestHRegion {
     } finally {
       HRegion.closeHRegion(this.region);
       this.region = null;
+      wals.close();
     }
   }
 
@@ -952,11 +978,13 @@ public class TestHRegion {
     // spy an actual WAL implementation to throw exception (was not able to mock)
     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
 
-    HLog hlog = spy(HLogFactory.createHLog(FILESYSTEM, logDir, "logs",
-      TEST_UTIL.getConfiguration()));
+    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
+    FSUtils.setRootDir(walConf, logDir);
+    final WALFactory wals = new WALFactory(walConf, null, method);
+    WAL wal = spy(wals.getWAL(tableName.getName()));
 
     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
-      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
+      HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
     try {
       int i = 0;
       Put put = new Put(Bytes.toBytes(i));
@@ -968,7 +996,7 @@ public class TestHRegion {
       IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH);
 
       // throw exceptions if the WalEdit is a start flush action
-      when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
+      when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
         (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
         (List<Cell>)any()))
           .thenThrow(new IOException("Fail to append flush marker"));
@@ -999,7 +1027,7 @@ public class TestHRegion {
 
       region.close();
       this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
-        HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
+        HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
       region.put(put);
 
       // 3. Test case where ABORT_FLUSH will throw exception.
@@ -4485,10 +4513,12 @@ public class TestHRegion {
     TableName tableName = TableName.valueOf(method);
     byte[] family = Bytes.toBytes("family");
     Path logDir = new Path(new Path(dir + method), "log");
-    HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), conf);
-    final HLog log = spy(hlog);
+    final Configuration walConf = new Configuration(conf);
+    FSUtils.setRootDir(walConf, logDir);
+    final WALFactory wals = new WALFactory(walConf, null, UUID.randomUUID().toString());
+    final WAL wal = spy(wals.getWAL(tableName.getName()));
     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
-        HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, log,
+        HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, wal,
         new byte[][] { family });
 
     Put put = new Put(Bytes.toBytes("r1"));
@@ -4497,8 +4527,8 @@ public class TestHRegion {
     region.put(put);
 
     //verify append called or not
-    verify(log, expectAppend ? times(1) : never())
-      .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
+    verify(wal, expectAppend ? times(1) : never())
+      .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
           (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<Cell>)any());
 
     // verify sync called or not
@@ -4508,9 +4538,9 @@ public class TestHRegion {
         public boolean evaluate() throws Exception {
           try {
             if (expectSync) {
-              verify(log, times(1)).sync(anyLong()); // Hregion calls this one
+              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
             } else if (expectSyncFromLogSyncer) {
-              verify(log, times(1)).sync(); // log syncer calls this one
+              verify(wal, times(1)).sync(); // wal syncer calls this one
             }
           } catch (Throwable ignore) {
           }
@@ -4518,8 +4548,8 @@ public class TestHRegion {
         }
       });
     } else {
-      //verify(log, never()).sync(anyLong());
-      verify(log, never()).sync();
+      //verify(wal, never()).sync(anyLong());
+      verify(wal, never()).sync();
     }
 
     HRegion.closeHRegion(this.region);
@@ -4531,7 +4561,7 @@ public class TestHRegion {
     // create a primary region, load some data and flush
     // create a secondary region, and do a get against that
     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
-    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
+    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir); 
 
     byte[][] families = new byte[][] {
         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -4581,7 +4611,7 @@ public class TestHRegion {
     // create a primary region, load some data and flush
     // create a secondary region, and do a put against that
     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
-    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
+    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
 
     byte[][] families = new byte[][] {
         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -4634,7 +4664,7 @@ public class TestHRegion {
   @Test
   public void testCompactionFromPrimary() throws IOException {
     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
-    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
+    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
 
     byte[][] families = new byte[][] {
         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
@@ -4888,8 +4918,9 @@ public class TestHRegion {
    */
   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
-      HLog hlog, byte[]... families) throws IOException {
-    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly, durability, hlog, families);
+      WAL wal, byte[]... families) throws IOException {
+    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
+        isReadOnly, durability, wal, families);
   }
 
   /**
@@ -5502,7 +5533,7 @@ public class TestHRegion {
     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
 
-    // open the region w/o rss and log and flush some files
+    // open the region w/o rss and wal and flush some files
     HRegion region =
          HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
             .getConfiguration(), htd);
@@ -5515,15 +5546,15 @@ public class TestHRegion {
 
     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
 
-    // capture appendNoSync() calls
-    HLog log = mock(HLog.class);
-    when(rss.getWAL((HRegionInfo) any())).thenReturn(log);
+    // capture append() calls
+    WAL wal = mock(WAL.class);
+    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
 
     try {
       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
         TEST_UTIL.getConfiguration(), rss, null);
 
-      verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any()
+      verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
         , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
 
       WALEdit edit = editCaptor.getValue();
@@ -5575,9 +5606,9 @@ public class TestHRegion {
 
     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
 
-    // capture appendNoSync() calls
-    HLog log = mock(HLog.class);
-    when(rss.getWAL((HRegionInfo) any())).thenReturn(log);
+    // capture append() calls
+    WAL wal = mock(WAL.class);
+    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
 
     // open a region first so that it can be closed later
     region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
@@ -5587,7 +5618,7 @@ public class TestHRegion {
     region.close(false);
 
     // 2 times, one for region open, the other close region
-    verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
+    verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
       editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
 
     WALEdit edit = editCaptor.getAllValues().get(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index 0d8c164..b549dbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.Before;
@@ -109,9 +109,9 @@ public class TestMajorCompaction {
 
   @After
   public void tearDown() throws Exception {
-    HLog hlog = r.getLog();
+    WAL wal = r.getWAL();
     this.r.close();
-    hlog.closeAndDelete();
+    wal.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java
index 580251c..ed1ac3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinorCompaction.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.Before;
@@ -89,9 +89,9 @@ public class TestMinorCompaction {
 
   @After
   public void tearDown() throws Exception {
-    HLog hlog = r.getLog();
+    WAL wal = r.getWAL();
     this.r.close();
-    hlog.closeAndDelete();
+    wal.close();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
index 113d61c..2d4b656 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -44,9 +45,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
@@ -68,7 +69,7 @@ public class TestRegionMergeTransaction {
   private HRegion region_a;
   private HRegion region_b;
   private HRegion region_c;
-  private HLog wal;
+  private WALFactory wals;
   private FileSystem fs;
   // Start rows of region_a,region_b,region_c
   private static final byte[] STARTROW_A = new byte[] { 'a', 'a', 'a' };
@@ -81,11 +82,12 @@ public class TestRegionMergeTransaction {
   public void setup() throws IOException {
     this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
     this.fs.delete(this.testdir, true);
-    this.wal = HLogFactory.createHLog(fs, this.testdir, "logs",
-        TEST_UTIL.getConfiguration());
-    this.region_a = createRegion(this.testdir, this.wal, STARTROW_A, STARTROW_B);
-    this.region_b = createRegion(this.testdir, this.wal, STARTROW_B, STARTROW_C);
-    this.region_c = createRegion(this.testdir, this.wal, STARTROW_C, ENDROW);
+    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
+    FSUtils.setRootDir(walConf, this.testdir);
+    this.wals = new WALFactory(walConf, null, TestRegionMergeTransaction.class.getName());
+    this.region_a = createRegion(this.testdir, this.wals, STARTROW_A, STARTROW_B);
+    this.region_b = createRegion(this.testdir, this.wals, STARTROW_B, STARTROW_C);
+    this.region_c = createRegion(this.testdir, this.wals, STARTROW_C, ENDROW);
     assert region_a != null && region_b != null && region_c != null;
     TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
   }
@@ -100,8 +102,9 @@ public class TestRegionMergeTransaction {
             + region.getRegionFileSystem().getRegionDir());
       }
     }
-    if (this.wal != null)
-      this.wal.closeAndDelete();
+    if (this.wals != null) {
+      this.wals.close();
+    }
     this.fs.delete(this.testdir, true);
   }
 
@@ -400,7 +403,7 @@ public class TestRegionMergeTransaction {
   private class MockedFailedMergedRegionOpen extends IOException {
   }
 
-  private HRegion createRegion(final Path testdir, final HLog wal,
+  private HRegion createRegion(final Path testdir, final WALFactory wals,
       final byte[] startrow, final byte[] endrow)
       throws IOException {
     // Make a region with start and end keys.
@@ -411,7 +414,7 @@ public class TestRegionMergeTransaction {
     HRegion a = HRegion.createHRegion(hri, testdir,
         TEST_UTIL.getConfiguration(), htd);
     HRegion.closeHRegion(a);
-    return HRegion.openHRegion(testdir, hri, htd, wal,
+    return HRegion.openHRegion(testdir, hri, htd, wals.getWAL(hri.getEncodedNameAsBytes()),
         TEST_UTIL.getConfiguration());
   }
 
@@ -456,7 +459,7 @@ public class TestRegionMergeTransaction {
           }
           Put put = new Put(k);
           put.add(f, null, k);
-          if (r.getLog() == null)
+          if (r.getWAL() == null)
             put.setDurability(Durability.SKIP_WAL);
           r.put(put);
           rowCount++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
index 6ea4399..cbef1ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -53,9 +54,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -76,7 +77,7 @@ public class TestSplitTransaction {
   private final Path testdir =
     TEST_UTIL.getDataTestDir(this.getClass().getName());
   private HRegion parent;
-  private HLog wal;
+  private WALFactory wals;
   private FileSystem fs;
   private static final byte [] STARTROW = new byte [] {'a', 'a', 'a'};
   // '{' is next ascii after 'z'.
@@ -91,10 +92,11 @@ public class TestSplitTransaction {
     this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
     TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
     this.fs.delete(this.testdir, true);
-    this.wal = HLogFactory.createHLog(fs, this.testdir, "logs",
-      TEST_UTIL.getConfiguration());
+    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
+    FSUtils.setRootDir(walConf, this.testdir);
+    this.wals = new WALFactory(walConf, null, this.getClass().getName());
     
-    this.parent = createRegion(this.testdir, this.wal);
+    this.parent = createRegion(this.testdir, this.wals);
     RegionCoprocessorHost host = new RegionCoprocessorHost(this.parent, null, TEST_UTIL.getConfiguration());
     this.parent.setCoprocessorHost(host);
     TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
@@ -106,7 +108,9 @@ public class TestSplitTransaction {
     if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
       throw new IOException("Failed delete of " + regionDir);
     }
-    if (this.wal != null) this.wal.closeAndDelete();
+    if (this.wals != null) {
+      this.wals.close();
+    }
     this.fs.delete(this.testdir, true);
   }
 
@@ -364,7 +368,7 @@ public class TestSplitTransaction {
     return rowcount;
   }
 
-  HRegion createRegion(final Path testdir, final HLog wal)
+  HRegion createRegion(final Path testdir, final WALFactory wals)
   throws IOException {
     // Make a region with start and end keys. Use 'aaa', to 'AAA'.  The load
     // region utility will add rows between 'aaa' and 'zzz'.
@@ -374,7 +378,7 @@ public class TestSplitTransaction {
     HRegionInfo hri = new HRegionInfo(htd.getTableName(), STARTROW, ENDROW);
     HRegion r = HRegion.createHRegion(hri, testdir, TEST_UTIL.getConfiguration(), htd);
     HRegion.closeHRegion(r);
-    return HRegion.openHRegion(testdir, hri, htd, wal,
+    return HRegion.openHRegion(testdir, hri, htd, wals.getWAL(hri.getEncodedNameAsBytes()),
       TEST_UTIL.getConfiguration());
   }
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index a6f13f0..f892be6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -167,8 +167,7 @@ public class TestStore {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
-    String logName = "logs";
-    Path logdir = new Path(basedir, logName);
+    final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
 
     FileSystem fs = FileSystem.get(conf);
 
@@ -180,8 +179,11 @@ public class TestStore {
       htd.addFamily(hcd);
     }
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
-    HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
-    HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
+    final Configuration walConf = new Configuration(conf);
+    FSUtils.setRootDir(walConf, basedir);
+    final WALFactory wals = new WALFactory(walConf, null, methodName);
+    HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
+        info, htd, null);
 
     store = new HStore(region, hcd, conf);
     return store;
@@ -782,7 +784,7 @@ public class TestStore {
         LOG.info("After failed flush, we should still have no files!");
         files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
         Assert.assertEquals(0, files != null ? files.size() : 0);
-        store.getHRegion().getLog().closeAndDelete();
+        store.getHRegion().getWAL().close();
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index be50ed1..781102c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.StoppableImplementation;
@@ -61,10 +61,10 @@ public class TestStoreFileRefresherChore {
   private Path testDir;
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     TEST_UTIL = new HBaseTestingUtility();
     testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore");
-    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
+    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), testDir);
   }
 
   private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) {
@@ -100,8 +100,10 @@ public class TestStoreFileRefresherChore {
     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);
 
     HRegionFileSystem fs = new FailingHRegionFileSystem(conf, tableDir.getFileSystem(conf), tableDir, info);
-    HRegion region = new HRegion(fs, HLogFactory.createHLog(fs.getFileSystem(),
-      tableDir, "log_" + replicaId, conf), conf, htd, null);
+    final Configuration walConf = new Configuration(conf);
+    FSUtils.setRootDir(walConf, tableDir);
+    final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId);
+    HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes()), conf, htd, null);
 
     region.initialize();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java
deleted file mode 100644
index 10ba82f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-
-/*
- * This is a utility class, used by tests, which fails operation specified by FailureType enum
- */
-public class FaultyHLog extends FSHLog {
-  public enum FailureType {
-    NONE, APPENDNOSYNC, SYNC
-  }
-  FailureType ft = FailureType.NONE;
-
-  public FaultyHLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
-      throws IOException {
-    super(fs, rootDir, logName, conf);
-  }
-  
-  public void setFailureType(FailureType fType) {
-    this.ft = fType;
-  }
-  
-  @Override
-  public void sync(long txid) throws IOException {
-    if (this.ft == FailureType.SYNC) {
-      throw new IOException("sync");
-    }
-    super.sync(txid);
-  }
-  @Override
-  public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
-      boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
-    if (this.ft == FailureType.APPENDNOSYNC) {
-      throw new IOException("appendNoSync");
-    }
-    return super.appendNoSync(info, tableName, edits, clusterIds, now, htd, sequenceId,
-      isInMemstore, nonceGroup, nonce);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
index 2164a43..a0e4490 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
@@ -23,12 +23,12 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Queue;
 
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 public class FaultySequenceFileLogReader extends SequenceFileLogReader {
 
-  enum FailureType {
+  // public until class relocates to o.a.h.h.wal
+  public enum FailureType {
     BEGINNING, MIDDLE, END, NONE
   }
 
@@ -40,17 +40,17 @@ public class FaultySequenceFileLogReader extends SequenceFileLogReader {
   }
 
   @Override
-  public HLog.Entry next(HLog.Entry reuse) throws IOException {
+  public Entry next(Entry reuse) throws IOException {
     this.entryStart = this.getPosition();
     boolean b = true;
 
     if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
       while (b == true) {
-        HLog.Entry e = new HLog.Entry(new HLogKey(), new WALEdit());
+        Entry e = new Entry(new HLogKey(), new WALEdit());
         if (compressionContext != null) {
           e.setCompressionContext(compressionContext);
         }
-        b = this.reader.next(e.getKey(), e.getEdit());
+        b = readNext(e);
         nextQueue.offer(e);
         numberOfFileEntries++;
       }