You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:57 UTC
[18/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index db66f5c..c3e8650 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
@@ -124,9 +125,10 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
@@ -324,15 +326,13 @@ public class HRegionServer extends HasThread implements
*/
Chore periodicFlusher;
- // HLog and HLog roller. log is protected rather than private to avoid
- // eclipse warning when accessed by inner classes
- protected volatile HLog hlog;
- // The meta updates are written to a different hlog. If this
- // regionserver holds meta regions, then this field will be non-null.
- protected volatile HLog hlogForMeta;
+ protected volatile WALFactory walFactory;
- LogRoller hlogRoller;
- LogRoller metaHLogRoller;
+ // WAL roller. log is protected rather than private to avoid
+ // eclipse warning when accessed by inner classes
+ final LogRoller walRoller;
+ // Lazily initialized if this RegionServer hosts a meta table.
+ final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@@ -543,6 +543,7 @@ public class HRegionServer extends HasThread implements
rpcServices.start();
putUpWebUI();
+ this.walRoller = new LogRoller(this, this);
}
protected void login(UserProvider user, String host) throws IOException {
@@ -971,7 +972,7 @@ public class HRegionServer extends HasThread implements
//fsOk flag may be changed when closing regions throws exception.
if (this.fsOk) {
- closeWAL(!abortRequested);
+ shutdownWAL(!abortRequested);
}
// Make sure the proxy is down.
@@ -1073,7 +1074,8 @@ public class HRegionServer extends HasThread implements
}
}
- ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+ ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
+ throws IOException {
// We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
// per second, and other metrics As long as metrics are part of ServerLoad it's best to use
// the wrapper to compute those numbers in one place.
@@ -1092,7 +1094,7 @@ public class HRegionServer extends HasThread implements
serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
- Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
+ Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
for (String coprocessor : coprocessors) {
serverLoad.addCoprocessors(
Coprocessor.newBuilder().setName(coprocessor).build());
@@ -1101,6 +1103,10 @@ public class HRegionServer extends HasThread implements
RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
for (HRegion region : regions) {
serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
+ for (String coprocessor :
+ getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()) {
+ serverLoad.addCoprocessors(Coprocessor.newBuilder().setName(coprocessor).build());
+ }
}
serverLoad.setReportStartTime(reportStartTime);
serverLoad.setReportEndTime(reportEndTime);
@@ -1189,33 +1195,24 @@ public class HRegionServer extends HasThread implements
return interrupted;
}
- private void closeWAL(final boolean delete) {
- if (this.hlogForMeta != null) {
- // All hlogs (meta and non-meta) are in the same directory. Don't call
- // closeAndDelete here since that would delete all hlogs not just the
- // meta ones. We will just 'close' the hlog for meta here, and leave
- // the directory cleanup to the follow-on closeAndDelete call.
- try {
- this.hlogForMeta.close();
- } catch (Throwable e) {
- LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
- }
- }
- if (this.hlog != null) {
+ private void shutdownWAL(final boolean close) {
+ if (this.walFactory != null) {
try {
- if (delete) {
- hlog.closeAndDelete();
+ if (close) {
+ walFactory.close();
} else {
- hlog.close();
+ walFactory.shutdown();
}
} catch (Throwable e) {
- LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
+ e = RemoteExceptionHandler.checkThrowable(e);
+ LOG.error("Shutdown / close of WAL failed: " + e);
+ LOG.debug("Shutdown / close exception details:", e);
}
}
}
/*
- * Run init. Sets up hlog and starts up all server threads.
+ * Run init. Sets up wal and starts up all server threads.
*
* @param c Extra configuration.
*/
@@ -1253,7 +1250,7 @@ public class HRegionServer extends HasThread implements
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
this.cacheConfig = new CacheConfig(conf);
- this.hlog = setupWALAndReplication();
+ this.walFactory = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
@@ -1497,10 +1494,10 @@ public class HRegionServer extends HasThread implements
* @return A WAL instance.
* @throws IOException
*/
- private HLog setupWALAndReplication() throws IOException {
+ private WALFactory setupWALAndReplication() throws IOException {
+ // TODO Replication make assumptions here based on the default filesystem impl
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final String logName
- = HLogUtil.getHLogDirectoryName(this.serverName.toString());
+ final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
@@ -1513,66 +1510,44 @@ public class HRegionServer extends HasThread implements
// log directories.
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
- return instantiateHLog(rootDir, logName);
- }
-
- private HLog getMetaWAL() throws IOException {
- if (this.hlogForMeta != null) return this.hlogForMeta;
- final String logName = HLogUtil.getHLogDirectoryName(this.serverName.toString());
- Path logdir = new Path(rootDir, logName);
- if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
- this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
- this.conf, getMetaWALActionListeners(), this.serverName.toString());
- return this.hlogForMeta;
- }
-
- /**
- * Called by {@link #setupWALAndReplication()} creating WAL instance.
- * @param rootdir
- * @param logName
- * @return WAL instance.
- * @throws IOException
- */
- protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
- return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
- getWALActionListeners(), this.serverName.toString());
- }
-
- /**
- * Called by {@link #instantiateHLog(Path, String)} setting up WAL instance.
- * Add any {@link WALActionsListener}s you want inserted before WAL startup.
- * @return List of WALActionsListener that will be passed in to
- * {@link org.apache.hadoop.hbase.regionserver.wal.FSHLog} on construction.
- */
- protected List<WALActionsListener> getWALActionListeners() {
- List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
- // Log roller.
- this.hlogRoller = new LogRoller(this, this);
- listeners.add(this.hlogRoller);
+ // listeners the wal factory will add to wals it creates.
+ final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+ listeners.add(new MetricsWAL());
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
// Replication handler is an implementation of WALActionsListener.
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}
- return listeners;
+
+ return new WALFactory(conf, listeners, serverName.toString());
}
- protected List<WALActionsListener> getMetaWALActionListeners() {
- List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+ /**
+ * We initialize the roller for the wal that handles meta lazily
+ * since we don't know if this regionserver will handle it. All calls to
+ * this method return a reference to the that same roller. As newly referenced
+ * meta regions are brought online, they will be offered to the roller for maintenance.
+ * As a part of that registration process, the roller will add itself as a
+ * listener on the wal.
+ */
+ protected LogRoller ensureMetaWALRoller() {
// Using a tmp log roller to ensure metaLogRoller is alive once it is not
// null
- MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
- String n = Thread.currentThread().getName();
- Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
- n + "-MetaLogRoller", uncaughtExceptionHandler);
- this.metaHLogRoller = tmpLogRoller;
- tmpLogRoller = null;
- listeners.add(this.metaHLogRoller);
- return listeners;
- }
-
- protected LogRoller getLogRoller() {
- return hlogRoller;
+ LogRoller roller = metawalRoller.get();
+ if (null == roller) {
+ LogRoller tmpLogRoller = new LogRoller(this, this);
+ String n = Thread.currentThread().getName();
+ Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
+ n + "-MetaLogRoller", uncaughtExceptionHandler);
+ if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
+ roller = tmpLogRoller;
+ } else {
+ // Another thread won starting the roller
+ Threads.shutdown(tmpLogRoller.getThread());
+ roller = metawalRoller.get();
+ }
+ }
+ return roller;
}
public MetricsRegionServer getRegionServerMetrics() {
@@ -1615,7 +1590,7 @@ public class HRegionServer extends HasThread implements
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
- Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller",
+ Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler);
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() +
@@ -1662,7 +1637,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
- this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this);
+ this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
splitLogWorker.start();
}
@@ -1725,38 +1700,37 @@ public class HRegionServer extends HasThread implements
}
// Verify that all threads are alive
if (!(leases.isAlive()
- && cacheFlusher.isAlive() && hlogRoller.isAlive()
+ && cacheFlusher.isAlive() && walRoller.isAlive()
&& this.compactionChecker.isAlive()
&& this.periodicFlusher.isAlive())) {
stop("One or more threads are no longer alive -- stop");
return false;
}
- if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
- stop("Meta HLog roller thread is no longer alive -- stop");
+ final LogRoller metawalRoller = this.metawalRoller.get();
+ if (metawalRoller != null && !metawalRoller.isAlive()) {
+ stop("Meta WAL roller thread is no longer alive -- stop");
return false;
}
return true;
}
- public HLog getWAL() {
- try {
- return getWAL(null);
- } catch (IOException e) {
- LOG.warn("getWAL threw exception " + e);
- return null;
- }
- }
+ private static final byte[] UNSPECIFIED_REGION = new byte[]{};
@Override
- public HLog getWAL(HRegionInfo regionInfo) throws IOException {
- //TODO: at some point this should delegate to the HLogFactory
- //currently, we don't care about the region as much as we care about the
- //table.. (hence checking the tablename below)
+ public WAL getWAL(HRegionInfo regionInfo) throws IOException {
+ WAL wal;
+ LogRoller roller = walRoller;
//_ROOT_ and hbase:meta regions have separate WAL.
if (regionInfo != null && regionInfo.isMetaTable()) {
- return getMetaWAL();
+ roller = ensureMetaWALRoller();
+ wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
+ } else if (regionInfo == null) {
+ wal = walFactory.getWAL(UNSPECIFIED_REGION);
+ } else {
+ wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
}
- return this.hlog;
+ roller.addWAL(wal);
+ return wal;
}
@Override
@@ -1982,11 +1956,12 @@ public class HRegionServer extends HasThread implements
if (this.spanReceiverHost != null) {
this.spanReceiverHost.closeReceivers();
}
- if (this.hlogRoller != null) {
- Threads.shutdown(this.hlogRoller.getThread());
+ if (this.walRoller != null) {
+ Threads.shutdown(this.walRoller.getThread());
}
- if (this.metaHLogRoller != null) {
- Threads.shutdown(this.metaHLogRoller.getThread());
+ final LogRoller metawalRoller = this.metawalRoller.get();
+ if (metawalRoller != null) {
+ Threads.shutdown(metawalRoller.getThread());
}
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
@@ -2524,11 +2499,24 @@ public class HRegionServer extends HasThread implements
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getRegionServerCoprocessors() {
- TreeSet<String> coprocessors = new TreeSet<String>(
- this.hlog.getCoprocessorHost().getCoprocessors());
+ TreeSet<String> coprocessors = new TreeSet<String>();
+ try {
+ coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
+ } catch (IOException exception) {
+ LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
+ "skipping.");
+ LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+ }
Collection<HRegion> regions = getOnlineRegionsLocalContext();
for (HRegion region: regions) {
coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
+ try {
+ coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
+ } catch (IOException exception) {
+ LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
+ "; skipping.");
+ LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
+ }
}
return coprocessors.toArray(new String[coprocessors.size()]);
}
@@ -2665,16 +2653,22 @@ public class HRegionServer extends HasThread implements
HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null) {
- HLog wal = getWAL();
- long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
- if (closeSeqNum == HConstants.NO_SEQNUM) {
- // No edits in WAL for this region; get the sequence number when the region was opened.
- closeSeqNum = r.getOpenSeqNum();
+ try {
+ WAL wal = getWAL(r.getRegionInfo());
+ long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
if (closeSeqNum == HConstants.NO_SEQNUM) {
- closeSeqNum = 0;
+ // No edits in WAL for this region; get the sequence number when the region was opened.
+ closeSeqNum = r.getOpenSeqNum();
+ if (closeSeqNum == HConstants.NO_SEQNUM) {
+ closeSeqNum = 0;
+ }
}
+ addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
+ } catch (IOException exception) {
+ LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
+ "; not adding to moved regions.");
+ LOG.debug("Exception details for failure to get wal", exception);
}
- addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
return toReturn != null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index eb51c50..2adaaba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -79,7 +79,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1216,7 +1216,7 @@ public class HStore implements Store {
*/
private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
Collection<StoreFile> newFiles) throws IOException {
- if (region.getLog() == null) return;
+ if (region.getWAL() == null) return;
List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
for (StoreFile f : filesCompacted) {
inputPaths.add(f.getPath());
@@ -1228,7 +1228,7 @@ public class HStore implements Store {
HRegionInfo info = this.region.getRegionInfo();
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
- HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 8179c98..821756d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,15 +18,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
@@ -36,17 +41,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
- * Runs periodically to determine if the HLog should be rolled.
+ * Runs periodically to determine if the WAL should be rolled.
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
+ *
+ * TODO: change to a pool of threads
*/
@InterfaceAudience.Private
-class LogRoller extends HasThread implements WALActionsListener {
+class LogRoller extends HasThread {
static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
+ private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
+ new ConcurrentHashMap<WAL, Boolean>();
private final Server server;
protected final RegionServerServices services;
private volatile long lastrolltime = System.currentTimeMillis();
@@ -54,6 +63,32 @@ class LogRoller extends HasThread implements WALActionsListener {
private final long rollperiod;
private final int threadWakeFrequency;
+ public void addWAL(final WAL wal) {
+ if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
+ wal.registerWALActionsListener(new WALActionsListener.Base() {
+ @Override
+ public void logRollRequested() {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ // TODO logs will contend with each other here, replace with e.g. DelayedQueue
+ synchronized(rollLog) {
+ rollLog.set(true);
+ rollLog.notifyAll();
+ }
+ }
+ });
+ }
+ }
+
+ public void requestRollAll() {
+ for (WAL wal : walNeedsRoll.keySet()) {
+ walNeedsRoll.put(wal, Boolean.TRUE);
+ }
+ synchronized(rollLog) {
+ rollLog.set(true);
+ rollLog.notifyAll();
+ }
+ }
+
/** @param server */
public LogRoller(final Server server, final RegionServerServices services) {
super();
@@ -84,19 +119,24 @@ class LogRoller extends HasThread implements WALActionsListener {
}
// Time for periodic roll
if (LOG.isDebugEnabled()) {
- LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
+ LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("HLog roll requested");
+ LOG.debug("WAL roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
this.lastrolltime = now;
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
- // The returned value is an array of actual region names.
- byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
- if (regionsToFlush != null) {
- for (byte [] r: regionsToFlush) scheduleFlush(r);
+ for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+ final WAL wal = entry.getKey();
+ // Force the roll if the logroll.period is elapsed or if a roll was requested.
+ // The returned value is an array of actual region names.
+ final byte [][] regionsToFlush = wal.rollWriter(periodic ||
+ entry.getValue().booleanValue());
+ walNeedsRoll.put(wal, Boolean.FALSE);
+ if (regionsToFlush != null) {
+ for (byte [] r: regionsToFlush) scheduleFlush(r);
+ }
}
} catch (FailedLogCloseException e) {
server.abort("Failed log close in log roller", e);
@@ -141,51 +181,4 @@ class LogRoller extends HasThread implements WALActionsListener {
}
}
- public void logRollRequested() {
- synchronized (rollLog) {
- rollLog.set(true);
- rollLog.notifyAll();
- }
- }
-
- protected HLog getWAL() throws IOException {
- return this.services.getWAL(null);
- }
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {
- // Not interested.
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
- WALEdit logEdit) {
- //Not interested
- }
-
- @Override
- public void logCloseRequested() {
- // not interested
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 72242c6..b2820dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -458,11 +458,11 @@ class MemStoreFlusher implements FlushRequester {
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
+ // section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
- server.abort("Replay of HLog required. Forcing server shutdown", ex);
+ server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" +
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
deleted file mode 100644
index 467cfdf..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetaLogRoller.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-
-@InterfaceAudience.Private
-class MetaLogRoller extends LogRoller {
- public MetaLogRoller(Server server, RegionServerServices services) {
- super(server, services);
- }
- @Override
- protected HLog getWAL() throws IOException {
- //The argument to getWAL below could either be HRegionInfo.FIRST_META_REGIONINFO or
- //HRegionInfo.ROOT_REGIONINFO. Both these share the same WAL.
- return services.getWAL(HRegionInfo.FIRST_META_REGIONINFO);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index a606e8c..415e271 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.metrics2.MetricsExecutor;
@@ -50,8 +51,8 @@ class MetricsRegionServerWrapperImpl
private BlockCache blockCache;
private volatile long numStores = 0;
- private volatile long numHLogFiles = 0;
- private volatile long hlogFileSize = 0;
+ private volatile long numWALFiles = 0;
+ private volatile long walFileSize = 0;
private volatile long numStoreFiles = 0;
private volatile long memstoreSize = 0;
private volatile long storeFileSize = 0;
@@ -274,13 +275,13 @@ class MetricsRegionServerWrapperImpl
}
@Override
- public long getNumHLogFiles() {
- return numHLogFiles;
+ public long getNumWALFiles() {
+ return numWALFiles;
}
@Override
- public long getHLogFileSize() {
- return hlogFileSize;
+ public long getWALFileSize() {
+ return walFileSize;
}
@Override
@@ -480,21 +481,11 @@ class MetricsRegionServerWrapperImpl
}
lastRan = currentTime;
+ numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory);
+ walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory);
+
//Copy over computed values so that no thread sees half computed values.
numStores = tempNumStores;
- long tempNumHLogFiles = regionServer.hlog.getNumLogFiles();
- // meta logs
- if (regionServer.hlogForMeta != null) {
- tempNumHLogFiles += regionServer.hlogForMeta.getNumLogFiles();
- }
- numHLogFiles = tempNumHLogFiles;
-
- long tempHlogFileSize = regionServer.hlog.getLogFileSize();
- if (regionServer.hlogForMeta != null) {
- tempHlogFileSize += regionServer.hlogForMeta.getLogFileSize();
- }
- hlogFileSize = tempHlogFileSize;
-
numStoreFiles = tempNumStoreFiles;
memstoreSize = tempMemstoreSize;
storeFileSize = tempStoreFileSize;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7d291bf..06e51c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -149,9 +149,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -687,13 +687,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
- final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
+ final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
- for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
- HLogSplitter.MutationReplay m = it.next();
+ for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
+ WALSplitter.MutationReplay m = it.next();
if (m.type == MutationType.PUT) {
batchContainsPuts = true;
@@ -718,7 +718,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
return region.batchReplay(mutations.toArray(
- new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId);
+ new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
@@ -1090,10 +1090,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
- // section, we get a DroppedSnapshotException and a replay of hlog
+ // section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server.
- regionServer.abort("Replay of HLog required. Forcing server shutdown", ex);
+ regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
@@ -1444,7 +1444,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegion region = regionServer.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
- List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
+ List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
for (WALEntry entry : entries) {
if (regionServer.nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
@@ -1452,9 +1452,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
}
- Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
- new Pair<HLogKey, WALEdit>();
- List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
+ Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
+ new Pair<WALKey, WALEdit>();
+ List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
cells, walEntry);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
@@ -1483,7 +1483,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.syncWal();
if (coprocessorHost != null) {
- for (Pair<HLogKey, WALEdit> wal : walEntries) {
+ for (Pair<WALKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
@@ -1536,14 +1536,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
- HLog wal = regionServer.getWAL();
- byte[][] regionsToFlush = wal.rollWriter(true);
+ regionServer.walRoller.requestRollAll();
+ regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
- if (regionsToFlush != null) {
- for (byte[] region: regionsToFlush) {
- builder.addRegionToFlush(ByteStringer.wrap(region));
- }
- }
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index d0362c8..bff2705 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -109,6 +110,8 @@ public class RegionCoprocessorHost
private static final int LATENCY_BUFFER_SIZE = 100;
private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
LATENCY_BUFFER_SIZE);
+ private final boolean useLegacyPre;
+ private final boolean useLegacyPost;
/**
* Constructor
@@ -122,6 +125,14 @@ public class RegionCoprocessorHost
this.region = region;
this.rsServices = services;
this.sharedData = sharedData;
+ // Pick which version of the WAL related events we'll call.
+ // This way we avoid calling the new version on older RegionObservers so
+ // we can maintain binary compatibility.
+ // See notes in javadoc for RegionObserver
+ useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
+ useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
}
/** @return the region */
@@ -1309,35 +1320,76 @@ public class RegionCoprocessorHost
* @return true if default behavior should be bypassed, false otherwise
* @throws IOException
*/
- public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+ public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
final WALEdit logEdit) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
- oserver.preWALRestore(ctx, info, logKey, logEdit);
+ // Once we don't need to support the legacy call, replace RegionOperation with a version
+ // that's ObserverContext<RegionEnvironment> and avoid this cast.
+ final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+ if (env.useLegacyPre) {
+ if (logKey instanceof HLogKey) {
+ oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ oserver.preWALRestore(ctx, info, logKey, logEdit);
+ }
}
});
}
/**
+ * @return true if default behavior should be bypassed, false otherwise
+ * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
+ */
+ @Deprecated
+ public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
+ final WALEdit logEdit) throws IOException {
+ return preWALRestore(info, (WALKey)logKey, logEdit);
+ }
+
+ /**
* @param info
* @param logKey
* @param logEdit
* @throws IOException
*/
- public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
- oserver.postWALRestore(ctx, info, logKey, logEdit);
+ // Once we don't need to support the legacy call, replace RegionOperation with a version
+ // that's ObserverContext<RegionEnvironment> and avoid this cast.
+ final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
+ if (env.useLegacyPost) {
+ if (logKey instanceof HLogKey) {
+ oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ oserver.postWALRestore(ctx, info, logKey, logEdit);
+ }
}
});
}
/**
+ * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
+ */
+ @Deprecated
+ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ throws IOException {
+ postWALRestore(info, (WALKey)logKey, logEdit);
+ }
+
+ /**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @return true if the default operation should be bypassed
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 479aced..879b573 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -34,7 +34,7 @@ public class RegionServerAccounting {
private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
- // Store the edits size during replaying HLog. Use this to roll back the
+ // Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed.
private final ConcurrentMap<byte[], AtomicLong> replayEditsPerRegion =
new ConcurrentSkipListMap<byte[], AtomicLong>(Bytes.BYTES_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index e8b953e..5ea630e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException;
/**
@@ -45,9 +45,9 @@ public interface RegionServerServices
*/
boolean isStopping();
- /** @return the HLog for a particular region. Pass null for getting the
+ /** @return the WAL for a particular region. Pass null for getting the
* default (common) WAL */
- HLog getWAL(HRegionInfo regionInfo) throws IOException;
+ WAL getWAL(HRegionInfo regionInfo) throws IOException;
/**
* @return Implementation of {@link CompactionRequestor} or null.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 335422c..eeffa8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -69,6 +70,7 @@ public class SplitLogWorker implements Runnable {
private SplitLogWorkerCoordination coordination;
private Configuration conf;
private RegionServerServices server;
+
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
this.server = server;
@@ -81,7 +83,8 @@ public class SplitLogWorker implements Runnable {
}
public SplitLogWorker(final Server hserver, final Configuration conf,
- final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
+ final RegionServerServices server, final LastSequenceId sequenceIdChecker,
+ final WALFactory factory) {
this(server, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
@@ -98,8 +101,8 @@ public class SplitLogWorker implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
- fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
+ if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
+ fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -152,6 +155,7 @@ public class SplitLogWorker implements Runnable {
LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
}
}
+
/**
* If the worker is doing a task i.e. splitting a log file then stop the task.
* It doesn't exit the worker thread.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
deleted file mode 100644
index 8e2ee62..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.handler;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-
-/**
- * Handles log splitting a wal
- */
-@InterfaceAudience.Private
-public class HLogSplitterHandler extends EventHandler {
- private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
- private final ServerName serverName;
- private final CancelableProgressable reporter;
- private final AtomicInteger inProgressTasks;
- private final TaskExecutor splitTaskExecutor;
- private final RecoveryMode mode;
- private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
- private final SplitLogWorkerCoordination coordination;
-
-
- public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
- SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
- AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
- super(server, EventType.RS_LOG_REPLAY);
- this.splitTaskDetails = splitDetails;
- this.coordination = coordination;
- this.reporter = reporter;
- this.inProgressTasks = inProgressTasks;
- this.inProgressTasks.incrementAndGet();
- this.serverName = server.getServerName();
- this.splitTaskExecutor = splitTaskExecutor;
- this.mode = mode;
- }
-
- @Override
- public void process() throws IOException {
- long startTime = System.currentTimeMillis();
- try {
- Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
- switch (status) {
- case DONE:
- coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
- SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
- break;
- case PREEMPTED:
- SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
- LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
- break;
- case ERR:
- if (server != null && !server.isStopped()) {
- coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
- SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
- break;
- }
- // if the RS is exiting then there is probably a tons of stuff
- // that can go wrong. Resign instead of signaling error.
- //$FALL-THROUGH$
- case RESIGNED:
- if (server != null && server.isStopped()) {
- LOG.info("task execution interrupted because worker is exiting "
- + splitTaskDetails.toString());
- }
- coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
- SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
- break;
- }
- } finally {
- LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
- + (System.currentTimeMillis() - startTime) + "ms");
- this.inProgressTasks.decrementAndGet();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
new file mode 100644
index 0000000..9a03192
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+
+/**
+ * Handles log splitting a wal
+ */
+@InterfaceAudience.Private
+public class WALSplitterHandler extends EventHandler {
+ private static final Log LOG = LogFactory.getLog(WALSplitterHandler.class);
+ private final ServerName serverName;
+ private final CancelableProgressable reporter;
+ private final AtomicInteger inProgressTasks;
+ private final TaskExecutor splitTaskExecutor;
+ private final RecoveryMode mode;
+ private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
+ private final SplitLogWorkerCoordination coordination;
+
+
+ public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
+ SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
+ AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
+ super(server, EventType.RS_LOG_REPLAY);
+ this.splitTaskDetails = splitDetails;
+ this.coordination = coordination;
+ this.reporter = reporter;
+ this.inProgressTasks = inProgressTasks;
+ this.inProgressTasks.incrementAndGet();
+ this.serverName = server.getServerName();
+ this.splitTaskExecutor = splitTaskExecutor;
+ this.mode = mode;
+ }
+
+ @Override
+ public void process() throws IOException {
+ long startTime = System.currentTimeMillis();
+ try {
+ Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
+ switch (status) {
+ case DONE:
+ coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
+ SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
+ break;
+ case PREEMPTED:
+ SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
+ LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
+ break;
+ case ERR:
+ if (server != null && !server.isStopped()) {
+ coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
+ SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
+ break;
+ }
+ // if the RS is exiting then there is probably a tons of stuff
+ // that can go wrong. Resign instead of signaling error.
+ //$FALL-THROUGH$
+ case RESIGNED:
+ if (server != null && server.isStopped()) {
+ LOG.info("task execution interrupted because worker is exiting "
+ + splitTaskDetails.toString());
+ }
+ coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
+ SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
+ break;
+ }
+ } finally {
+ LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
+ + (System.currentTimeMillis() - startTime) + "ms");
+ this.inProgressTasks.decrementAndGet();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index d8da412..12af619 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
/**
- * Context that holds the various dictionaries for compression in HLog.
+ * Context that holds the various dictionaries for compression in WAL.
*/
@InterfaceAudience.Private
-class CompressionContext {
+public class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";
- final Dictionary regionDict;
- final Dictionary tableDict;
- final Dictionary familyDict;
+ // visible only for WALKey, until we move everything into o.a.h.h.wal
+ public final Dictionary regionDict;
+ public final Dictionary tableDict;
+ public final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
// Context used for compressing tags
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index b75a7cf..4032cde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -33,9 +33,13 @@ import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+
/**
* A set of static functions for running our custom WAL compression/decompression.
- * Also contains a command line tool to compress and uncompress HLogs.
+ * Also contains a command line tool to compress and uncompress WALs.
*/
@InterfaceAudience.Private
public class Compressor {
@@ -56,8 +60,8 @@ public class Compressor {
private static void printHelp() {
System.err.println("usage: Compressor <input> <output>");
- System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
- System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
+ System.err.println("If <input> WAL is compressed, <output> will be decompressed.");
+ System.err.println("If <input> WAL is uncompressed, <output> will be compressed.");
return;
}
@@ -68,8 +72,8 @@ public class Compressor {
FileSystem inFS = input.getFileSystem(conf);
FileSystem outFS = output.getFileSystem(conf);
- HLog.Reader in = HLogFactory.createReader(inFS, input, conf, null, false);
- HLog.Writer out = null;
+ WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
+ WALProvider.Writer out = null;
try {
if (!(in instanceof ReaderBase)) {
@@ -78,9 +82,9 @@ public class Compressor {
}
boolean compress = ((ReaderBase)in).hasCompression();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
- out = HLogFactory.createWALWriter(outFS, output, conf);
+ out = WALFactory.createWALWriter(outFS, output, conf);
- HLog.Entry e = null;
+ WAL.Entry e = null;
while ((e = in.next()) != null) out.append(e);
} finally {
in.close();