You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:57 UTC

[18/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index db66f5c..c3e8650 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.ObjectName;
@@ -124,9 +125,10 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
@@ -324,15 +326,13 @@ public class HRegionServer extends HasThread implements
    */
   Chore periodicFlusher;
 
-  // HLog and HLog roller. log is protected rather than private to avoid
-  // eclipse warning when accessed by inner classes
-  protected volatile HLog hlog;
-  // The meta updates are written to a different hlog. If this
-  // regionserver holds meta regions, then this field will be non-null.
-  protected volatile HLog hlogForMeta;
+  protected volatile WALFactory walFactory;
 
-  LogRoller hlogRoller;
-  LogRoller metaHLogRoller;
+  // WAL roller. log is protected rather than private to avoid
+  // eclipse warning when accessed by inner classes
+  final LogRoller walRoller;
+  // Lazily initialized if this RegionServer hosts a meta table.
+  final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
 
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
@@ -543,6 +543,7 @@ public class HRegionServer extends HasThread implements
 
     rpcServices.start();
     putUpWebUI();
+    this.walRoller = new LogRoller(this, this);
   }
 
   protected void login(UserProvider user, String host) throws IOException {
@@ -971,7 +972,7 @@ public class HRegionServer extends HasThread implements
 
     //fsOk flag may be changed when closing regions throws exception.
     if (this.fsOk) {
-      closeWAL(!abortRequested);
+      shutdownWAL(!abortRequested);
     }
 
     // Make sure the proxy is down.
@@ -1073,7 +1074,8 @@ public class HRegionServer extends HasThread implements
     }
   }
 
-  ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+  ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
+      throws IOException {
     // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
     // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
     // the wrapper to compute those numbers in one place.
@@ -1092,7 +1094,7 @@ public class HRegionServer extends HasThread implements
     serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
-    Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
+    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
     for (String coprocessor : coprocessors) {
       serverLoad.addCoprocessors(
         Coprocessor.newBuilder().setName(coprocessor).build());
@@ -1101,6 +1103,10 @@ public class HRegionServer extends HasThread implements
     RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
     for (HRegion region : regions) {
       serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
+      for (String coprocessor :
+          getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
+        serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
+      }
     }
     serverLoad.setReportStartTime(reportStartTime);
     serverLoad.setReportEndTime(reportEndTime);
@@ -1189,33 +1195,24 @@ public class HRegionServer extends HasThread implements
     return interrupted;
   }
 
-  private void closeWAL(final boolean delete) {
-    if (this.hlogForMeta != null) {
-      // All hlogs (meta and non-meta) are in the same directory. Don't call
-      // closeAndDelete here since that would delete all hlogs not just the
-      // meta ones. We will just 'close' the hlog for meta here, and leave
-      // the directory cleanup to the follow-on closeAndDelete call.
-      try {
-        this.hlogForMeta.close();
-      } catch (Throwable e) {
-        LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
-      }
-    }
-    if (this.hlog != null) {
+  private void shutdownWAL(final boolean close) {
+    if (this.walFactory != null) {
       try {
-        if (delete) {
-          hlog.closeAndDelete();
+        if (close) {
+          walFactory.close();
         } else {
-          hlog.close();
+          walFactory.shutdown();
         }
       } catch (Throwable e) {
-        LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
+        e = RemoteExceptionHandler.checkThrowable(e);
+        LOG.error("Shutdown / close of WAL failed: " + e);
+        LOG.debug("Shutdown / close exception details:", e);
       }
     }
   }
 
   /*
-   * Run init. Sets up hlog and starts up all server threads.
+   * Run init. Sets up wal and starts up all server threads.
    *
    * @param c Extra configuration.
    */
@@ -1253,7 +1250,7 @@ public class HRegionServer extends HasThread implements
       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
 
       this.cacheConfig = new CacheConfig(conf);
-      this.hlog = setupWALAndReplication();
+      this.walFactory = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
 
@@ -1497,10 +1494,10 @@ public class HRegionServer extends HasThread implements
    * @return A WAL instance.
    * @throws IOException
    */
-  private HLog setupWALAndReplication() throws IOException {
+  private WALFactory setupWALAndReplication() throws IOException {
+    // TODO Replication make assumptions here based on the default filesystem impl
     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    final String logName
-      = HLogUtil.getHLogDirectoryName(this.serverName.toString());
+    final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
 
     Path logdir = new Path(rootDir, logName);
     if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
@@ -1513,66 +1510,44 @@ public class HRegionServer extends HasThread implements
     // log directories.
     createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
 
-    return instantiateHLog(rootDir, logName);
-  }
-
-  private HLog getMetaWAL() throws IOException {
-    if (this.hlogForMeta != null) return this.hlogForMeta;
-    final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
-    Path logdir = new Path(rootDir, logName);
-    if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
-    this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
-      this.conf, getMetaWALActionListeners(), this.serverName.toString());
-    return this.hlogForMeta;
-  }
-
-  /**
-   * Called by {@link #setupWALAndReplication()} creating WAL instance.
-   * @param rootdir
-   * @param logName
-   * @return WAL instance.
-   * @throws IOException
-   */
-  protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
-    return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
-      getWALActionListeners(), this.serverName.toString());
-  }
-
-  /**
-   * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
-   * Add any {@link WALActionsListener}s you want inserted before WAL startup.
-   * @return List of WALActionsListener that will be passed in to
-   * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
-   */
-  protected List<WALActionsListener> getWALActionListeners() {
-    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
-    // Log roller.
-    this.hlogRoller = new LogRoller(this, this);
-    listeners.add(this.hlogRoller);
+    // listeners the wal factory will add to wals it creates.
+    final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+    listeners.add(new MetricsWAL());
     if (this.replicationSourceHandler != null &&
         this.replicationSourceHandler.getWALActionsListener() != null) {
       // Replication handler is an implementation of WALActionsListener.
       listeners.add(this.replicationSourceHandler.getWALActionsListener());
     }
-    return listeners;
+
+    return new WALFactory(conf, listeners, serverName.toString());
   }
 
-  protected List<WALActionsListener> getMetaWALActionListeners() {
-    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+  /**
+   * We initialize the roller for the wal that handles meta lazily
+   * since we don't know if this regionserver will handle it. All calls to
+   * this method return a reference to the that same roller. As newly referenced
+   * meta regions are brought online, they will be offered to the roller for maintenance.
+   * As a part of that registration process, the roller will add itself as a
+   * listener on the wal.
+   */
+  protected LogRoller ensureMetaWALRoller() {
     // Using a tmp log roller to ensure metaLogRoller is alive once it is not
     // null
-    MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
-    String n = Thread.currentThread().getName();
-    Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
-        n + "-MetaLogRoller", uncaughtExceptionHandler);
-    this.metaHLogRoller = tmpLogRoller;
-    tmpLogRoller = null;
-    listeners.add(this.metaHLogRoller);
-    return listeners;
-  }
-
-  protected LogRoller getLogRoller() {
-    return hlogRoller;
+    LogRoller roller = metawalRoller.get();
+    if (null == roller) {
+      LogRoller tmpLogRoller = new LogRoller(this, this);
+      String n = Thread.currentThread().getName();
+      Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
+          n + "-MetaLogRoller", uncaughtExceptionHandler);
+      if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
+        roller = tmpLogRoller;
+      } else {
+        // Another thread won starting the roller
+        Threads.shutdown(tmpLogRoller.getThread());
+        roller = metawalRoller.get();
+      }
+    }
+    return roller;
   }
 
   public MetricsRegionServer getRegionServerMetrics() {
@@ -1615,7 +1590,7 @@ public class HRegionServer extends HasThread implements
     this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
        "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
 
-    Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
+    Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
         uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
@@ -1662,7 +1637,7 @@ public class HRegionServer extends HasThread implements
     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
       conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
     sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
-    this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
+    this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
     splitLogWorker.start();
   }
 
@@ -1725,38 +1700,37 @@ public class HRegionServer extends HasThread implements
     }
     // Verify that all threads are alive
     if (!(leases.isAlive()
-        && cacheFlusher.isAlive() && hlogRoller.isAlive()
+        && cacheFlusher.isAlive() && walRoller.isAlive()
         && this.compactionChecker.isAlive()
         && this.periodicFlusher.isAlive())) {
       stop("One or more threads are no longer alive -- stop");
       return false;
     }
-    if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
-      stop("Meta HLog roller thread is no longer alive -- stop");
+    final LogRoller metawalRoller = this.metawalRoller.get();
+    if (metawalRoller != null && !metawalRoller.isAlive()) {
+      stop("Meta WAL roller thread is no longer alive -- stop");
       return false;
     }
     return true;
   }
 
-  public HLog getWAL() {
-    try {
-      return getWAL(null);
-    } catch (IOException e) {
-      LOG.warn("getWAL threw exception " + e);
-      return null;
-    }
-  }
+  private static final byte[] UNSPECIFIED_REGION = new byte[]{};
 
   @Override
-  public HLog getWAL(HRegionInfo regionInfo) throws IOException {
-    //TODO: at some point this should delegate to the HLogFactory
-    //currently, we don't care about the region as much as we care about the
-    //table.. (hence checking the tablename below)
+  public WAL getWAL(HRegionInfo regionInfo) throws IOException {
+    WAL wal;
+    LogRoller roller = walRoller;
     //_ROOT_ and hbase:meta regions have separate WAL.
     if (regionInfo != null && regionInfo.isMetaTable()) {
-      return getMetaWAL();
+      roller = ensureMetaWALRoller();
+      wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
+    } else if (regionInfo == null) {
+      wal = walFactory.getWAL(UNSPECIFIED_REGION);
+    } else {
+      wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
     }
-    return this.hlog;
+    roller.addWAL(wal);
+    return wal;
   }
 
   @Override
@@ -1982,11 +1956,12 @@ public class HRegionServer extends HasThread implements
     if (this.spanReceiverHost != null) {
       this.spanReceiverHost.closeReceivers();
     }
-    if (this.hlogRoller != null) {
-      Threads.shutdown(this.hlogRoller.getThread());
+    if (this.walRoller != null) {
+      Threads.shutdown(this.walRoller.getThread());
     }
-    if (this.metaHLogRoller != null) {
-      Threads.shutdown(this.metaHLogRoller.getThread());
+    final LogRoller metawalRoller = this.metawalRoller.get();
+    if (metawalRoller != null) {
+      Threads.shutdown(metawalRoller.getThread());
     }
     if (this.compactSplitThread != null) {
       this.compactSplitThread.join();
@@ -2524,11 +2499,24 @@ public class HRegionServer extends HasThread implements
 
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
   public String[] getRegionServerCoprocessors() {
-    TreeSet<String> coprocessors = new TreeSet<String>(
-        this.hlog.getCoprocessorHost().getCoprocessors());
+    TreeSet<String> coprocessors = new TreeSet<String>();
+    try {
+      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
+    } catch (IOException exception) {
+      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
+          "skipping.");
+      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+    }
     Collection<HRegion> regions = getOnlineRegionsLocalContext();
     for (HRegion region: regions) {
       coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
+      try {
+        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
+      } catch (IOException exception) {
+        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
+            "; skipping.");
+        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+      }
     }
     return coprocessors.toArray(new String[coprocessors.size()]);
   }
@@ -2665,16 +2653,22 @@ public class HRegionServer extends HasThread implements
     HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
 
     if (destination != null) {
-      HLog wal = getWAL();
-      long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
-      if (closeSeqNum == HConstants.NO_SEQNUM) {
-        // No edits in WAL for this region; get the sequence number when the region was opened.
-        closeSeqNum = r.getOpenSeqNum();
+      try {
+        WAL wal = getWAL(r.getRegionInfo());
+        long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
         if (closeSeqNum == HConstants.NO_SEQNUM) {
-          closeSeqNum = 0;
+          // No edits in WAL for this region; get the sequence number when the region was opened.
+          closeSeqNum = r.getOpenSeqNum();
+          if (closeSeqNum == HConstants.NO_SEQNUM) {
+            closeSeqNum = 0;
+          }
         }
+        addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
+      } catch (IOException exception) {
+        LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
+            "; not adding to moved regions.");
+        LOG.debug("Exception details for failure to get wal", exception);
       }
-      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
     }
     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
     return toReturn != null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index eb51c50..2adaaba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -79,7 +79,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1216,7 +1216,7 @@ public class HStore implements Store {
    */
   private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
       Collection<StoreFile> newFiles) throws IOException {
-    if (region.getLog() == null) return;
+    if (region.getWAL() == null) return;
     List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
     for (StoreFile f : filesCompacted) {
       inputPaths.add(f.getPath());
@@ -1228,7 +1228,7 @@ public class HStore implements Store {
     HRegionInfo info = this.region.getRegionInfo();
     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
-    HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+    WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
         this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 8179c98..821756d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,15 +18,20 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -36,17 +41,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Runs periodically to determine if the HLog should be rolled.
+ * Runs periodically to determine if the WAL should be rolled.
  *
  * NOTE: This class extends Thread rather than Chore because the sleep time
  * can be interrupted when there is something to do, rather than the Chore
  * sleep time which is invariant.
+ *
+ * TODO: change to a pool of threads
  */
 @InterfaceAudience.Private
-class LogRoller extends HasThread implements WALActionsListener {
+class LogRoller extends HasThread {
   static final Log LOG = LogFactory.getLog(LogRoller.class);
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
+  private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
+      new ConcurrentHashMap<WAL, Boolean>();
   private final Server server;
   protected final RegionServerServices services;
   private volatile long lastrolltime = System.currentTimeMillis();
@@ -54,6 +63,32 @@ class LogRoller extends HasThread implements WALActionsListener {
   private final long rollperiod;
   private final int threadWakeFrequency;
 
+  public void addWAL(final WAL wal) {
+    if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
+      wal.registerWALActionsListener(new WALActionsListener.Base() {
+        @Override
+        public void logRollRequested() {
+          walNeedsRoll.put(wal, Boolean.TRUE);
+          // TODO logs will contend with each other here, replace with e.g. DelayedQueue
+          synchronized(rollLog) {
+            rollLog.set(true);
+            rollLog.notifyAll();
+          }
+        }
+      });
+    }
+  }
+
+  public void requestRollAll() {
+    for (WAL wal : walNeedsRoll.keySet()) {
+      walNeedsRoll.put(wal, Boolean.TRUE);
+    }
+    synchronized(rollLog) {
+      rollLog.set(true);
+      rollLog.notifyAll();
+    }
+  }
+
   /** @param server */
   public LogRoller(final Server server, final RegionServerServices services) {
     super();
@@ -84,19 +119,24 @@ class LogRoller extends HasThread implements WALActionsListener {
         }
         // Time for periodic roll
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
+          LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
         }
       } else if (LOG.isDebugEnabled()) {
-        LOG.debug("HLog roll requested");
+        LOG.debug("WAL roll requested");
       }
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
         this.lastrolltime = now;
-        // Force the roll if the logroll.period is elapsed or if a roll was requested.
-        // The returned value is an array of actual region names.
-        byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
-        if (regionsToFlush != null) {
-          for (byte [] r: regionsToFlush) scheduleFlush(r);
+        for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+          final WAL wal = entry.getKey();
+          // Force the roll if the logroll.period is elapsed or if a roll was requested.
+          // The returned value is an array of actual region names.
+          final byte [][] regionsToFlush = wal.rollWriter(periodic ||
+              entry.getValue().booleanValue());
+          walNeedsRoll.put(wal, Boolean.FALSE);
+          if (regionsToFlush != null) {
+            for (byte [] r: regionsToFlush) scheduleFlush(r);
+          }
         }
       } catch (FailedLogCloseException e) {
         server.abort("Failed log close in log roller", e);
@@ -141,51 +181,4 @@ class LogRoller extends HasThread implements WALActionsListener {
     }
   }
 
-  public void logRollRequested() {
-    synchronized (rollLog) {
-      rollLog.set(true);
-      rollLog.notifyAll();
-    }
-  }
-
-  protected HLog getWAL() throws IOException {
-    return this.services.getWAL(null);
-  }
-
-  @Override
-  public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
-  @Override
-  public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
-  @Override
-  public void preLogArchive(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
-  @Override
-  public void postLogArchive(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
-  @Override
-  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
-      WALEdit logEdit) {
-    // Not interested.
-  }
-
-  @Override
-  public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
-                                       WALEdit logEdit) {
-    //Not interested
-  }
-
-  @Override
-  public void logCloseRequested() {
-    // not interested
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 72242c6..b2820dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -458,11 +458,11 @@ class MemStoreFlusher implements FlushRequester {
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
-      // section, we get a DroppedSnapshotException and a replay of hlog
+      // section, we get a DroppedSnapshotException and a replay of wal
       // is required. Currently the only way to do this is a restart of
       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
       // where hdfs was bad but passed the hdfs check).
-      server.abort("Replay of HLog required. Forcing server shutdown", ex);
+      server.abort("Replay of WAL required. Forcing server shutdown", ex);
       return false;
     } catch (IOException ex) {
       LOG.error("Cache flush failed" +

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
deleted file mode 100644
index 467cfdf..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-
-@InterfaceAudience.Private
-class MetaLogRoller extends LogRoller {
-  public MetaLogRoller(Server server, RegionServerServices services) {
-    super(server, services);
-  }
-  @Override
-  protected HLog getWAL() throws IOException {
-    //The argument to getWAL below could either be HRegionInfo.FIRST_META_REGIONINFO or
-    //HRegionInfo.ROOT_REGIONINFO. Both these share the same WAL.
-    return services.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index a606e8c..415e271 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.metrics2.MetricsExecutor;
@@ -50,8 +51,8 @@ class MetricsRegionServerWrapperImpl
   private BlockCache blockCache;
 
   private volatile long numStores = 0;
-  private volatile long numHLogFiles = 0;
-  private volatile long hlogFileSize = 0;
+  private volatile long numWALFiles = 0;
+  private volatile long walFileSize = 0;
   private volatile long numStoreFiles = 0;
   private volatile long memstoreSize = 0;
   private volatile long storeFileSize = 0;
@@ -274,13 +275,13 @@ class MetricsRegionServerWrapperImpl
   }
   
   @Override
-  public long getNumHLogFiles() {
-    return numHLogFiles;
+  public long getNumWALFiles() {
+    return numWALFiles;
   }
 
   @Override
-  public long getHLogFileSize() {
-    return hlogFileSize;
+  public long getWALFileSize() {
+    return walFileSize;
   }
   
   @Override
@@ -480,21 +481,11 @@ class MetricsRegionServerWrapperImpl
       }
       lastRan = currentTime;
 
+      numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory);
+      walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory);
+
       //Copy over computed values so that no thread sees half computed values.
       numStores = tempNumStores;
-      long tempNumHLogFiles = regionServer.hlog.getNumLogFiles();
-      // meta logs
-      if (regionServer.hlogForMeta != null) {
-        tempNumHLogFiles += regionServer.hlogForMeta.getNumLogFiles();
-      }
-      numHLogFiles = tempNumHLogFiles;
-      
-      long tempHlogFileSize = regionServer.hlog.getLogFileSize();
-      if (regionServer.hlogForMeta != null) {
-        tempHlogFileSize += regionServer.hlogForMeta.getLogFileSize();
-      }
-      hlogFileSize = tempHlogFileSize;
-      
       numStoreFiles = tempNumStoreFiles;
       memstoreSize = tempMemstoreSize;
       storeFileSize = tempStoreFileSize;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7d291bf..06e51c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -149,9 +149,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -687,13 +687,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @throws IOException
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
-      final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
+      final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
 
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
-      for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
-        HLogSplitter.MutationReplay m = it.next();
+      for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
+        WALSplitter.MutationReplay m = it.next();
 
         if (m.type == MutationType.PUT) {
           batchContainsPuts = true;
@@ -718,7 +718,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         regionServer.cacheFlusher.reclaimMemStoreMemory();
       }
       return region.batchReplay(mutations.toArray(
-        new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId);
+        new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
     } finally {
       if (regionServer.metricsRegionServer != null) {
         long after = EnvironmentEdgeManager.currentTime();
@@ -1090,10 +1090,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       return builder.build();
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
-      // section, we get a DroppedSnapshotException and a replay of hlog
+      // section, we get a DroppedSnapshotException and a replay of wal
       // is required. Currently the only way to do this is a restart of
       // the server.
-      regionServer.abort("Replay of HLog required. Forcing server shutdown", ex);
+      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
       throw new ServiceException(ex);
     } catch (IOException ie) {
       throw new ServiceException(ie);
@@ -1444,7 +1444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       HRegion region = regionServer.getRegionByEncodedName(
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
       RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
-      List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
+      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
       for (WALEntry entry : entries) {
         if (regionServer.nonceManager != null) {
           long nonceGroup = entry.getKey().hasNonceGroup()
@@ -1452,9 +1452,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
           regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
         }
-        Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
-          new Pair<HLogKey, WALEdit>();
-        List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
+        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
+          new Pair<WALKey, WALEdit>();
+        List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
           cells, walEntry);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
@@ -1483,7 +1483,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       region.syncWal();
 
       if (coprocessorHost != null) {
-        for (Pair<HLogKey, WALEdit> wal : walEntries) {
+        for (Pair<WALKey, WALEdit> wal : walEntries) {
           coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
             wal.getSecond());
         }
@@ -1536,14 +1536,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       checkOpen();
       requestCount.increment();
       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
-      HLog wal = regionServer.getWAL();
-      byte[][] regionsToFlush = wal.rollWriter(true);
+      regionServer.walRoller.requestRollAll();
+      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
-      if (regionsToFlush != null) {
-        for (byte[] region: regionsToFlush) {
-          builder.addRegionToFlush(ByteStringer.wrap(region));
-        }
-      }
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index d0362c8..bff2705 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -109,6 +110,8 @@ public class RegionCoprocessorHost
     private static final int LATENCY_BUFFER_SIZE = 100;
     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
         LATENCY_BUFFER_SIZE);
+    private final boolean useLegacyPre;
+    private final boolean useLegacyPost;
 
     /**
      * Constructor
@@ -122,6 +125,14 @@ public class RegionCoprocessorHost
       this.region = region;
       this.rsServices = services;
       this.sharedData = sharedData;
+      // Pick which version of the WAL related events we'll call.
+      // This way we avoid calling the new version on older RegionObservers so
+      // we can maintain binary compatibility.
+      // See notes in javadoc for RegionObserver
+      useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
+          HRegionInfo.class, WALKey.class, WALEdit.class);
+      useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
+          HRegionInfo.class, WALKey.class, WALEdit.class);
     }
 
     /** @return the region */
@@ -1309,35 +1320,76 @@ public class RegionCoprocessorHost
    * @return true if default behavior should be bypassed, false otherwise
    * @throws IOException
    */
-  public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+  public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
       final WALEdit logEdit) throws IOException {
     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        oserver.preWALRestore(ctx, info, logKey, logEdit);
+        // Once we don't need to support the legacy call, replace RegionOperation with a version
+        // that's ObserverContext<RegionEnvironment> and avoid this cast.
+        final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+        if (env.useLegacyPre) {
+          if (logKey instanceof HLogKey) {
+            oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+          } else {
+            legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+          }
+        } else {
+          oserver.preWALRestore(ctx, info, logKey, logEdit);
+        }
       }
     });
   }
 
   /**
+   * @return true if default behavior should be bypassed, false otherwise
+   * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
+   */
+  @Deprecated
+  public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+      final WALEdit logEdit) throws IOException {
+    return preWALRestore(info, (WALKey)logKey, logEdit);
+  }
+
+  /**
    * @param info
    * @param logKey
    * @param logEdit
    * @throws IOException
    */
-  public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+  public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        oserver.postWALRestore(ctx, info, logKey, logEdit);
+        // Once we don't need to support the legacy call, replace RegionOperation with a version
+        // that's ObserverContext<RegionEnvironment> and avoid this cast.
+        final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+        if (env.useLegacyPost) {
+          if (logKey instanceof HLogKey) {
+            oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+          } else {
+            legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+          }
+        } else {
+          oserver.postWALRestore(ctx, info, logKey, logEdit);
+        }
       }
     });
   }
 
   /**
+   * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
+   */
+  @Deprecated
+  public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+      throws IOException {
+    postWALRestore(info, (WALKey)logKey, logEdit);
+  }
+
+  /**
    * @param familyPaths pairs of { CF, file path } submitted for bulk load
    * @return true if the default operation should be bypassed
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 479aced..879b573 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -34,7 +34,7 @@ public class RegionServerAccounting {
 
   private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
   
-  // Store the edits size during replaying HLog. Use this to roll back the  
+  // Store the edits size during replaying WAL. Use this to roll back the  
   // global memstore size once a region opening failed.
   private final ConcurrentMap<byte[], AtomicLong> replayEditsPerRegion = 
     new ConcurrentSkipListMap<byte[], AtomicLong>(Bytes.BYTES_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index e8b953e..5ea630e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -45,9 +45,9 @@ public interface RegionServerServices
    */
   boolean isStopping();
 
-  /** @return the HLog for a particular region. Pass null for getting the
+  /** @return the WAL for a particular region. Pass null for getting the
    * default (common) WAL */
-  HLog getWAL(HRegionInfo regionInfo) throws IOException;
+  WAL getWAL(HRegionInfo regionInfo) throws IOException;
 
   /**
    * @return Implementation of {@link CompactionRequestor} or null.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 335422c..eeffa8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -69,6 +70,7 @@ public class SplitLogWorker implements Runnable {
   private SplitLogWorkerCoordination coordination;
   private Configuration conf;
   private RegionServerServices server;
+
   public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
       TaskExecutor splitTaskExecutor) {
     this.server = server;
@@ -81,7 +83,8 @@ public class SplitLogWorker implements Runnable {
   }
 
   public SplitLogWorker(final Server hserver, final Configuration conf,
-      final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
+      final RegionServerServices server, final LastSequenceId sequenceIdChecker,
+      final WALFactory factory) {
     this(server, conf, server, new TaskExecutor() {
       @Override
       public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
@@ -98,8 +101,8 @@ public class SplitLogWorker implements Runnable {
         // interrupted or has encountered a transient error and when it has
         // encountered a bad non-retry-able persistent error.
         try {
-          if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
-            fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
+          if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
+            fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -152,6 +155,7 @@ public class SplitLogWorker implements Runnable {
       LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
     }
   }
+
   /**
    * If the worker is doing a task i.e. splitting a log file then stop the task.
    * It doesn't exit the worker thread.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
deleted file mode 100644
index 8e2ee62..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.handler;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-
-/**
- * Handles log splitting a wal
- */
-@InterfaceAudience.Private
-public class HLogSplitterHandler extends EventHandler {
-  private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
-  private final ServerName serverName;
-  private final CancelableProgressable reporter;
-  private final AtomicInteger inProgressTasks;
-  private final TaskExecutor splitTaskExecutor;
-  private final RecoveryMode mode;
-  private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
-  private final SplitLogWorkerCoordination coordination;
-
-
-  public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
-      SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
-      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
-    super(server, EventType.RS_LOG_REPLAY);
-    this.splitTaskDetails = splitDetails;
-    this.coordination = coordination;
-    this.reporter = reporter;
-    this.inProgressTasks = inProgressTasks;
-    this.inProgressTasks.incrementAndGet();
-    this.serverName = server.getServerName();
-    this.splitTaskExecutor = splitTaskExecutor;
-    this.mode = mode;
-  }
-
-  @Override
-  public void process() throws IOException {
-    long startTime = System.currentTimeMillis();
-    try {
-      Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
-      switch (status) {
-      case DONE:
-        coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
-          SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
-        break;
-      case PREEMPTED:
-        SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
-        LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
-        break;
-      case ERR:
-        if (server != null && !server.isStopped()) {
-          coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
-            SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
-          break;
-        }
-        // if the RS is exiting then there is probably a tons of stuff
-        // that can go wrong. Resign instead of signaling error.
-        //$FALL-THROUGH$
-      case RESIGNED:
-        if (server != null && server.isStopped()) {
-          LOG.info("task execution interrupted because worker is exiting "
-              + splitTaskDetails.toString());
-        }
-        coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
-          SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
-        break;
-      }
-    } finally {
-      LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
-          + (System.currentTimeMillis() - startTime) + "ms");
-      this.inProgressTasks.decrementAndGet();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
new file mode 100644
index 0000000..9a03192
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+
+/**
+ * Handles log splitting a wal
+ */
+@InterfaceAudience.Private
+public class WALSplitterHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(WALSplitterHandler.class);
+  private final ServerName serverName;
+  private final CancelableProgressable reporter;
+  private final AtomicInteger inProgressTasks;
+  private final TaskExecutor splitTaskExecutor;
+  private final RecoveryMode mode;
+  private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
+  private final SplitLogWorkerCoordination coordination;
+
+
+  public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
+      SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
+      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
+    super(server, EventType.RS_LOG_REPLAY);
+    this.splitTaskDetails = splitDetails;
+    this.coordination = coordination;
+    this.reporter = reporter;
+    this.inProgressTasks = inProgressTasks;
+    this.inProgressTasks.incrementAndGet();
+    this.serverName = server.getServerName();
+    this.splitTaskExecutor = splitTaskExecutor;
+    this.mode = mode;
+  }
+
+  @Override
+  public void process() throws IOException {
+    long startTime = System.currentTimeMillis();
+    try {
+      Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
+      switch (status) {
+      case DONE:
+        coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
+          SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
+        break;
+      case PREEMPTED:
+        SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+        LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
+        break;
+      case ERR:
+        if (server != null && !server.isStopped()) {
+          coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
+            SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
+          break;
+        }
+        // if the RS is exiting then there is probably a tons of stuff
+        // that can go wrong. Resign instead of signaling error.
+        //$FALL-THROUGH$
+      case RESIGNED:
+        if (server != null && server.isStopped()) {
+          LOG.info("task execution interrupted because worker is exiting "
+              + splitTaskDetails.toString());
+        }
+        coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
+          SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
+        break;
+      }
+    } finally {
+      LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
+          + (System.currentTimeMillis() - startTime) + "ms");
+      this.inProgressTasks.decrementAndGet();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index d8da412..12af619 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.util.Dictionary;
 
 /**
- * Context that holds the various dictionaries for compression in HLog.
+ * Context that holds the various dictionaries for compression in WAL.
  */
 @InterfaceAudience.Private
-class CompressionContext {
+public class CompressionContext {
 
   static final String ENABLE_WAL_TAGS_COMPRESSION = 
       "hbase.regionserver.wal.tags.enablecompression";
 
-  final Dictionary regionDict;
-  final Dictionary tableDict;
-  final Dictionary familyDict;
+  // visible only for WALKey, until we move everything into o.a.h.h.wal
+  public final Dictionary regionDict;
+  public final Dictionary tableDict;
+  public final Dictionary familyDict;
   final Dictionary qualifierDict;
   final Dictionary rowDict;
   // Context used for compressing tags

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index b75a7cf..4032cde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -33,9 +33,13 @@ import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+
 /**
  * A set of static functions for running our custom WAL compression/decompression.
- * Also contains a command line tool to compress and uncompress HLogs.
+ * Also contains a command line tool to compress and uncompress WALs.
  */
 @InterfaceAudience.Private
 public class Compressor {
@@ -56,8 +60,8 @@ public class Compressor {
 
   private static void printHelp() {
     System.err.println("usage: Compressor <input> <output>");
-    System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
-    System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
+    System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
+    System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
     return;
   }
 
@@ -68,8 +72,8 @@ public class Compressor {
     FileSystem inFS = input.getFileSystem(conf);
     FileSystem outFS = output.getFileSystem(conf);
 
-    HLog.Reader in = HLogFactory.createReader(inFS, input, conf, null, false);
-    HLog.Writer out = null;
+    WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
+    WALProvider.Writer out = null;
 
     try {
       if (!(in instanceof ReaderBase)) {
@@ -78,9 +82,9 @@ public class Compressor {
       }
       boolean compress = ((ReaderBase)in).hasCompression();
       conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
-      out = HLogFactory.createWALWriter(outFS, output, conf);
+      out = WALFactory.createWALWriter(outFS, output, conf);
 
-      HLog.Entry e = null;
+      WAL.Entry e = null;
       while ((e = in.next()) != null) out.append(e);
     } finally {
       in.close();