You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:58 UTC
[19/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index bd3dceb..23ef6a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLog
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@@ -101,8 +101,7 @@ public class SplitLogManager {
private Server server;
private final Stoppable stopper;
- private FileSystem fs;
- private Configuration conf;
+ private final Configuration conf;
public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
@@ -160,16 +159,34 @@ public class SplitLogManager {
}
private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
+ return getFileList(conf, logDirs, filter);
+ }
+
+ /**
+ * Get a list of paths that need to be split given a set of server-specific directories and
+ * optinally a filter.
+ *
+ * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
+ * layout.
+ *
+ * Should be package-private, but is needed by
+ * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
+ * Configuration, WALFactory)} for tests.
+ */
+ @VisibleForTesting
+ public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
+ final PathFilter filter)
+ throws IOException {
List<FileStatus> fileStatus = new ArrayList<FileStatus>();
- for (Path hLogDir : logDirs) {
- this.fs = hLogDir.getFileSystem(conf);
- if (!fs.exists(hLogDir)) {
- LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
+ for (Path logDir : logDirs) {
+ final FileSystem fs = logDir.getFileSystem(conf);
+ if (!fs.exists(logDir)) {
+ LOG.warn(logDir + " doesn't exist. Nothing to do!");
continue;
}
- FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
+ FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
if (logfiles == null || logfiles.length == 0) {
- LOG.info(hLogDir + " is empty dir, no logs to split");
+ LOG.info(logDir + " is empty dir, no logs to split");
} else {
Collections.addAll(fileStatus, logfiles);
}
@@ -179,7 +196,7 @@ public class SplitLogManager {
}
/**
- * @param logDir one region sever hlog dir path in .logs
+ * @param logDir one region sever wal dir path in .logs
* @throws IOException if there was an error while splitting any log file
* @return cumulative size of the logfiles split
* @throws IOException
@@ -205,7 +222,7 @@ public class SplitLogManager {
Set<ServerName> serverNames = new HashSet<ServerName>();
for (Path logDir : logDirs) {
try {
- ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
+ ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
if (serverName != null) {
serverNames.add(serverName);
}
@@ -272,6 +289,7 @@ public class SplitLogManager {
}
for (Path logDir : logDirs) {
status.setStatus("Cleaning up log directory...");
+ final FileSystem fs = logDir.getFileSystem(conf);
try {
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 6c8e428..f68bfa2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
/**
- * This Chore, every time it runs, will attempt to delete the HLogs in the old logs folder. The HLog
+ * This Chore, every time it runs, will attempt to delete the WALs in the old logs folder. The WAL
* is only deleted if none of the cleaner delegates says otherwise.
* @see BaseLogCleanerDelegate
*/
@@ -51,6 +51,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
@Override
protected boolean validate(Path file) {
- return HLogUtil.validateHLogFilename(file.getName());
+ return DefaultWALProvider.validateWALFilename(file.getName());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
index 3a39fb4..9d68601 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveLogCleaner.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
- * Log cleaner that uses the timestamp of the hlog to determine if it should
+ * Log cleaner that uses the timestamp of the wal to determine if it should
* be deleted. By default they are allowed to live for 10 minutes.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
index be6b391..0e72496 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
@@ -67,7 +67,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
boolean distributedLogReplay =
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
- if (this.shouldSplitHlog) {
+ if (this.shouldSplitWal) {
LOG.info("Splitting hbase:meta logs for " + serverName);
if (distributedLogReplay) {
Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
@@ -101,7 +101,7 @@ public class MetaServerShutdownHandler extends ServerShutdownHandler {
}
try {
- if (this.shouldSplitHlog && distributedLogReplay) {
+ if (this.shouldSplitWal && distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index d506fe5..907d5ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -61,19 +61,19 @@ public class ServerShutdownHandler extends EventHandler {
protected final ServerName serverName;
protected final MasterServices services;
protected final DeadServer deadServers;
- protected final boolean shouldSplitHlog; // whether to split HLog or not
+ protected final boolean shouldSplitWal; // whether to split WAL or not
protected final int regionAssignmentWaitTimeout;
public ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName,
- final boolean shouldSplitHlog) {
+ final boolean shouldSplitWal) {
this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN,
- shouldSplitHlog);
+ shouldSplitWal);
}
ServerShutdownHandler(final Server server, final MasterServices services,
final DeadServer deadServers, final ServerName serverName, EventType type,
- final boolean shouldSplitHlog) {
+ final boolean shouldSplitWal) {
super(server, type);
this.serverName = serverName;
this.server = server;
@@ -82,7 +82,7 @@ public class ServerShutdownHandler extends EventHandler {
if (!this.deadServers.isDeadServer(this.serverName)) {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
- this.shouldSplitHlog = shouldSplitHlog;
+ this.shouldSplitWal = shouldSplitWal;
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}
@@ -138,7 +138,7 @@ public class ServerShutdownHandler extends EventHandler {
AssignmentManager am = services.getAssignmentManager();
ServerManager serverManager = services.getServerManager();
if (isCarryingMeta() /* hbase:meta */ || !am.isFailoverCleanupDone()) {
- serverManager.processDeadServer(serverName, this.shouldSplitHlog);
+ serverManager.processDeadServer(serverName, this.shouldSplitWal);
return;
}
@@ -200,7 +200,7 @@ public class ServerShutdownHandler extends EventHandler {
(this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
try {
- if (this.shouldSplitHlog) {
+ if (this.shouldSplitWal) {
if (distributedLogReplay) {
LOG.info("Mark regions in recovery for crashed server " + serverName +
" before assignment; regions=" + hris);
@@ -302,13 +302,13 @@ public class ServerShutdownHandler extends EventHandler {
throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
} catch (IOException ioe) {
LOG.info("Caught " + ioe + " during region assignment, will retry");
- // Only do HLog splitting if shouldSplitHlog and in DLR mode
+ // Only do wal splitting if shouldSplitWal and in DLR mode
serverManager.processDeadServer(serverName,
- this.shouldSplitHlog && distributedLogReplay);
+ this.shouldSplitWal && distributedLogReplay);
return;
}
- if (this.shouldSplitHlog && distributedLogReplay) {
+ if (this.shouldSplitWal && distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
index d5e174d..a927db3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java
@@ -46,11 +46,11 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
* snapshots (ms)
*/
- static final String HLOG_CACHE_REFRESH_PERIOD_CONF_KEY =
+ static final String WAL_CACHE_REFRESH_PERIOD_CONF_KEY =
"hbase.master.hlogcleaner.plugins.snapshot.period";
/** Refresh cache, by default, every 5 minutes */
- private static final long DEFAULT_HLOG_CACHE_REFRESH_PERIOD = 300000;
+ private static final long DEFAULT_WAL_CACHE_REFRESH_PERIOD = 300000;
private SnapshotFileCache cache;
@@ -77,14 +77,14 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
super.setConf(conf);
try {
long cacheRefreshPeriod = conf.getLong(
- HLOG_CACHE_REFRESH_PERIOD_CONF_KEY, DEFAULT_HLOG_CACHE_REFRESH_PERIOD);
+ WAL_CACHE_REFRESH_PERIOD_CONF_KEY, DEFAULT_WAL_CACHE_REFRESH_PERIOD);
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path rootDir = FSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-log-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
throws IOException {
- return SnapshotReferenceUtil.getHLogNames(fs, snapshotDir);
+ return SnapshotReferenceUtil.getWALNames(fs, snapshotDir);
}
});
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
index e34420d..d1bd167 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
@@ -48,9 +48,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -380,12 +379,11 @@ public class NamespaceUpgrade implements Tool {
ServerName fakeServer = ServerName.valueOf("nsupgrade", 96, 123);
- String metaLogName = HLogUtil.getHLogDirectoryName(fakeServer.toString());
- HLog metaHLog = HLogFactory.createMetaHLog(fs, rootDir,
- metaLogName, conf, null,
- fakeServer.toString());
+ final WALFactory walFactory = new WALFactory(conf, null, fakeServer.toString());
+ WAL metawal = walFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ FSTableDescriptors fst = new FSTableDescriptors(conf);
HRegion meta = HRegion.openHRegion(rootDir, HRegionInfo.FIRST_META_REGIONINFO,
- HTableDescriptor.META_TABLEDESC, metaHLog, conf);
+ fst.get(TableName.META_TABLE_NAME), metawal, conf);
HRegion region = null;
try {
for(Path regionDir : FSUtils.getRegionDirs(fs, oldTablePath)) {
@@ -402,7 +400,7 @@ public class NamespaceUpgrade implements Tool {
new HRegion(
HRegionFileSystem.openRegionFromFileSystem(conf, fs, oldTablePath,
oldRegionInfo, false),
- metaHLog,
+ metawal,
conf,
oldDesc,
null);
@@ -439,7 +437,7 @@ public class NamespaceUpgrade implements Tool {
meta.flushcache();
meta.waitForFlushesAndCompactions();
meta.close();
- metaHLog.closeAndDelete();
+ walFactory.close();
if(region != null) {
region.close();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
index 6df2eab..fc11823 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java
@@ -36,7 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileV1Detector;
@@ -234,6 +235,8 @@ public class UpgradeTo96 extends Configured implements Tool {
LOG.info("Starting Log splitting");
final Path rootDir = FSUtils.getRootDir(getConf());
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ // since this is the singleton, we needn't close it.
+ final WALFactory factory = WALFactory.getInstance(getConf());
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
FileStatus[] regionServerLogDirs = FSUtils.listStatus(fs, logDir);
@@ -244,7 +247,7 @@ public class UpgradeTo96 extends Configured implements Tool {
try {
for (FileStatus regionServerLogDir : regionServerLogDirs) {
// split its log dir, if exists
- HLogSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf());
+ WALSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf(), factory);
}
LOG.info("Successfully completed Log splitting");
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 8acce16..d68d247 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
@@ -50,14 +50,14 @@ import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
/**
- * A helper to replicate a list of HLog entries using admin protocol.
+ * A helper to replicate a list of WAL entries using admin protocol.
*
* @param admin
* @param entries
* @throws java.io.IOException
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final HLog.Entry[] entries) throws IOException {
+ final Entry[] entries) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
@@ -69,14 +69,14 @@ public class ReplicationProtbufUtil {
}
/**
- * Create a new ReplicateWALEntryRequest from a list of HLog entries
+ * Create a new ReplicateWALEntryRequest from a list of WAL entries
*
- * @param entries the HLog entries to be replicated
+ * @param entries the WAL entries to be replicated
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
* found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
- buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
+ buildReplicateWALEntryRequest(final Entry[] entries) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
@@ -85,11 +85,11 @@ public class ReplicationProtbufUtil {
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
- for (HLog.Entry entry: entries) {
+ for (Entry entry: entries) {
entryBuilder.clear();
- // TODO: this duplicates a lot in HLogKey#getBuilder
+ // TODO: this duplicates a lot in WALKey#getBuilder
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
- HLogKey key = entry.getKey();
+ WALKey key = entry.getKey();
keyBuilder.setEncodedRegionName(
ByteStringer.wrap(key.getEncodedRegionName()));
keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 288825f..9874683 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -127,12 +128,15 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Flus
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -226,13 +230,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected volatile long lastFlushSeqId = -1L;
/**
- * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
+ * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
* Its default value is -1L. This default is used as a marker to indicate
* that the region hasn't opened yet. Once it is opened, it is set to the derived
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
*
- * <p>Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
+ * <p>Control of this sequence is handed off to the WAL implementation. It is responsible
* for tagging edits with the correct sequence id since it is responsible for getting the
* edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
* OUTSIDE OF THE WAL. The value you get will not be what you think it is.
@@ -298,7 +302,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
- private final HLog log;
+ private final WAL wal;
private final HRegionFileSystem fs;
protected final Configuration conf;
private final Configuration baseConf;
@@ -346,7 +350,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
//
// Context: During replay we want to ensure that we do not lose any data. So, we
- // have to be conservative in how we replay logs. For each store, we calculate
+ // have to be conservative in how we replay wals. For each store, we calculate
// the maxSeqId up to which the store was flushed. And, skip the edits which
// are equal to or lower than maxSeqId for each store.
// The following map is populated when opening the region
@@ -546,11 +550,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -561,11 +564,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rsServices reference to {@link RegionServerServices} or null
*/
@Deprecated
- public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
+ public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, final RegionServerServices rsServices) {
this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
- log, confParam, htd, rsServices);
+ wal, confParam, htd, rsServices);
}
/**
@@ -574,18 +577,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
*
* @param fs is the filesystem.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param confParam is global configuration settings.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
*/
- public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
+ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
final HTableDescriptor htd, final RegionServerServices rsServices) {
if (htd == null) {
throw new IllegalArgumentException("Need table descriptor");
@@ -596,7 +598,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.comparator = fs.getRegionInfo().getComparator();
- this.log = log;
+ this.wal = wal;
this.fs = fs;
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
@@ -772,14 +774,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.splitPolicy = RegionSplitPolicy.create(this, conf);
this.lastFlushTime = EnvironmentEdgeManager.currentTime();
- // Use maximum of log sequenceid or that which was found in stores
+ // Use maximum of wal sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
if (this.isRecovering) {
// In distributedLogReplay mode, we don't know the last change sequence number because region
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers
- nextSeqid = HLogUtil.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
+ nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000));
}
@@ -872,7 +874,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return maxSeqId;
}
- private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
+ private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
@@ -887,11 +889,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
getSequenceId());
}
- private void writeRegionCloseMarker(HLog log) throws IOException {
+ private void writeRegionCloseMarker(WAL wal) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
@@ -906,7 +908,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
getSequenceId());
}
@@ -1043,7 +1045,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
boolean wasRecovering = this.isRecovering;
this.isRecovering = newState;
if (wasRecovering && !isRecovering) {
- // Call only when log replay is over.
+ // Call only when wal replay is over.
coprocessorHost.postLogReplay();
}
}
@@ -1289,8 +1291,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Writing region close event to WAL");
- if (!abort && log != null && getRegionServerServices() != null) {
- writeRegionCloseMarker(log);
+ if (!abort && wal != null && getRegionServerServices() != null) {
+ writeRegionCloseMarker(wal);
}
this.closed.set(true);
@@ -1415,9 +1417,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this.htableDescriptor;
}
- /** @return HLog in use for this region */
- public HLog getLog() {
- return this.log;
+ /** @return WAL in use for this region */
+ public WAL getWAL() {
+ return this.wal;
}
/**
@@ -1623,7 +1625,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return true if the region needs compacting
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
public FlushResult flushcache() throws IOException {
@@ -1721,7 +1723,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
- * memstore, all of which have also been written to the log. We need to write those updates in the
+ * memstore, all of which have also been written to the wal. We need to write those updates in the
* memstore out to disk, while being able to process reads/writes as much as possible during the
* flush operation.
* <p>This method may block for some time. Every time you call it, we up the regions
@@ -1732,24 +1734,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return object describing the flush's state
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
protected FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
- return internalFlushcache(this.log, -1, status);
+ return internalFlushcache(this.wal, -1, status);
}
/**
- * @param wal Null if we're NOT to go via hlog/wal.
+ * @param wal Null if we're NOT to go via wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
* @return object describing the flush's state
* @throws IOException
* @see #internalFlushcache(MonitoredTask)
*/
protected FlushResult internalFlushcache(
- final HLog wal, final long myseqid, MonitoredTask status)
- throws IOException {
+ final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1763,14 +1764,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
- // Presume that if there are still no edits in the memstore, then
- // there are no edits for
- // this region out in the WAL/HLog subsystem so no need to do any
- // trickery clearing out
- // edits in the WAL system. Up the sequence number so the resulting
- // flush id is for
- // sure just beyond the last appended region edit (useful as a marker
- // when bulk loading,
+ // Presume that if there are still no edits in the memstore, then there are no edits for
+ // this region out in the WAL subsystem so no need to do any trickery clearing out
+ // edits in the WAL system. Up the sequence number so the resulting flush id is for
+ // sure just beyond the last appended region edit (useful as a marker when bulk loading,
// etc.)
// wal can be null replaying edits.
if (wal != null) {
@@ -1850,7 +1847,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (wal != null) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
}
@@ -1864,7 +1861,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -1889,12 +1886,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
- LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
+ LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
+ StringUtils.stringifyException(ioe));
}
}
- // wait for all in-progress transactions to commit to HLog before
+ // wait for all in-progress transactions to commit to WAL before
// we can start the flush. This prevents
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
@@ -1914,8 +1911,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Any failure from here on out will be catastrophic requiring server
- // restart so hlog content can be replayed and put back into the memstore.
- // Otherwise, the snapshot content while backed up in the hlog, it will not
+ // restart so wal content can be replayed and put back into the memstore.
+ // Otherwise, the snapshot content while backed up in the wal, it will not
// be part of the current running servers state.
boolean compactionRequested = false;
try {
@@ -1948,12 +1945,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memstore.
+ // The wal needs to be replayed so its content is restored to memstore.
// Currently, only a server restart will do this.
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
@@ -1962,7 +1959,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2016,8 +2013,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
- private long getNextSequenceId(final HLog wal) throws IOException {
- HLogKey key = this.appendNoSyncNoAppend(wal, null);
+ private long getNextSequenceId(final WAL wal) throws IOException {
+ WALKey key = this.appendEmptyEdit(wal, null);
return key.getSequenceId();
}
@@ -2349,7 +2346,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
+ private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
@@ -2417,7 +2414,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
+ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@@ -2532,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
int lastIndexExclusive = firstIndex;
boolean success = false;
int noOfPuts = 0, noOfDeletes = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
try {
// ------------------------------------
@@ -2675,7 +2672,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// ------------------------------------
// STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore
- // first without updating the HLog because we do not roll
+ // first without updating the WAL because we do not roll
// forward the memstore MVCC. The MVCC will be moved up when
// the complete operation is done. These changes are not yet
// visible to scanners till we update the MVCC. The MVCC is
@@ -2724,10 +2721,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new IOException("Multiple nonces per batch and not in replay");
}
// txid should always increase, so having the one from the last call is ok.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
walKey = null;
@@ -2751,18 +2749,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
if(isInReplay) {
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// -------------------------------
@@ -3165,7 +3164,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Add updates first to the hlog and then add values to memstore.
+ * Add updates first to the wal and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
* @throws IOException
@@ -3297,7 +3296,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Append the given map of family->edits to a WALEdit data structure.
- * This does not write to the HLog itself.
+ * This does not write to the WAL itself.
* @param familyMap map of family->edits
* @param walEdit the destination entry to append into
*/
@@ -3339,11 +3338,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Read the edits log put under this region by wal log splitting process. Put
+ * Read the edits put under this region by wal splitting process. Put
* the recovered edits back up into this region.
*
- * <p>We can ignore any log message that has a sequence ID that's equal to or
- * lower than minSeqId. (Because we know such log messages are already
+ * <p>We can ignore any wal message that has a sequence ID that's equal to or
+ * lower than minSeqId. (Because we know such messages are already
* reflected in the HFiles.)
*
* <p>While this is running we are putting pressure on memory yet we are
@@ -3352,15 +3351,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that if we're up against global memory limits, we'll not be flagged to flush
* because we are not online. We can't be flushed by usual mechanisms anyways;
* we're not yet online so our relative sequenceids are not yet aligned with
- * HLog sequenceids -- not till we come up online, post processing of split
+ * WAL sequenceids -- not till we come up online, post processing of split
* edits.
*
* <p>But to help relieve memory pressure, at least manage our own heap size
* flushing if are in excess of per-region limits. Flushing, though, we have
- * to be careful and avoid using the regionserver/hlog sequenceid. Its running
+ * to be careful and avoid using the regionserver/wal sequenceid. Its running
* on a different line to whats going on in here in this region context so if we
* crashed replaying these edits, but in the midst had a flush that used the
- * regionserver log with a sequenceid in excess of whats going on in here
+ * regionserver wal with a sequenceid in excess of whats going on in here
* in this region and with its split editlogs, then we could miss edits the
* next time we go to recover. So, we have to flush inline, using seqids that
* make sense in a this single region context only -- until we online.
@@ -3385,7 +3384,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem();
- NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
+ NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
@@ -3405,7 +3404,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
- String msg = "Maximum sequenceid for this log is " + maxSeqId
+ String msg = "Maximum sequenceid for this wal is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
@@ -3429,7 +3428,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@@ -3460,7 +3459,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/*
* @param edits File of recovered edits.
- * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log
+ * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
* must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
@@ -3475,17 +3474,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
- status.setStatus("Opening logs");
- HLog.Reader reader = null;
+ status.setStatus("Opening recovered edits");
+ WAL.Reader reader = null;
try {
- reader = HLogFactory.createReader(fs, edits, conf);
+ reader = WALFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
- HLog.Entry entry;
+ WAL.Entry entry;
Store store = null;
boolean reported_once = false;
ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
@@ -3499,7 +3498,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long lastReport = EnvironmentEdgeManager.currentTime();
while ((entry = reader.next()) != null) {
- HLogKey key = entry.getKey();
+ WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (ng != null) { // some test, or nonces disabled
@@ -3541,7 +3540,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
- // if bypass this log entry, ignore it ...
+ // if bypass this wal entry, ignore it ...
continue;
}
}
@@ -3600,9 +3599,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
} catch (EOFException eof) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "Encountered EOF. Most likely due to Master failure during " +
- "log splitting, so we have this data in another edit. " +
+ "wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, eof);
status.abort(msg);
@@ -3610,7 +3609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@@ -4429,11 +4428,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HConstants#REGION_IMPL} configuration property.
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -4443,7 +4441,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param htd the table descriptor
* @return the new instance
*/
- static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
+ static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
RegionServerServices rsServices) {
try {
@@ -4452,11 +4450,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
(Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
Constructor<? extends HRegion> c =
- regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
+ regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
Configuration.class, HRegionInfo.class, HTableDescriptor.class,
RegionServerServices.class);
- return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
+ return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
@@ -4466,11 +4464,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Convenience method creating new HRegions. Used by createTable and by the
* bootstrap code in the HMaster constructor.
- * Note, this method creates an {@link HLog} for the created region. It
- * needs to be closed explicitly. Use {@link HRegion#getLog()} to get
+ * Note, this method creates an {@link WAL} for the created region. It
+ * needs to be closed explicitly. Use {@link HRegion#getWAL()} to get
* access. <b>When done with a region created using this method, you will
- * need to explicitly close the {@link HLog} it created too; it will not be
- * done for you. Not closing the log will leave at least a daemon thread
+ * need to explicitly close the {@link WAL} it created too; it will not be
+ * done for you. Not closing the wal will leave at least a daemon thread
* running.</b> Call {@link #closeHRegion(HRegion)} and it will do
* necessary cleanup for you.
* @param info Info for region to create.
@@ -4489,27 +4487,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* This will do the necessary cleanup a call to
* {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
* requires. This method will close the region and then close its
- * associated {@link HLog} file. You use it if you call the other createHRegion,
- * the one that takes an {@link HLog} instance but don't be surprised by the
- * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
+ * associated {@link WAL} file. You can still use it if you call the other createHRegion,
+ * the one that takes an {@link WAL} instance but don't be surprised by the
+ * call to the {@link WAL#close()} on the {@link WAL} the
* HRegion was carrying.
* @throws IOException
*/
public static void closeHRegion(final HRegion r) throws IOException {
if (r == null) return;
r.close();
- if (r.getLog() == null) return;
- r.getLog().closeAndDelete();
+ if (r.getWAL() == null) return;
+ r.getWAL().close();
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed explicitly.
- * Use {@link HRegion#getLog()} to get access.
+ * The {@link WAL} for the created region needs to be closed explicitly.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
* @return new HRegion
*
@@ -4518,72 +4516,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
+ final WAL wal,
final boolean initialize)
throws IOException {
return createHRegion(info, rootDir, conf, hTableDescriptor,
- hlog, initialize, false);
+ wal, initialize, false);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
- return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+ return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
+ ignoreWAL);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
FileSystem fs = FileSystem.get(conf);
- HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
- HLog effectiveHLog = hlog;
- if (hlog == null && !ignoreHLog) {
- effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
- HConstants.HREGION_LOGDIR_NAME, conf);
+ HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
+ WAL effectiveWAL = wal;
+ if (wal == null && !ignoreWAL) {
+ // TODO HBASE-11983 There'll be no roller for this wal?
+ // The WAL subsystem will use the default rootDir rather than the passed in rootDir
+ // unless I pass along via the conf.
+ Configuration confForWAL = new Configuration(conf);
+ confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
+ effectiveWAL = (new WALFactory(confForWAL,
+ Collections.<WALActionsListener>singletonList(new MetricsWAL()),
+ "hregion-" + RandomStringUtils.randomNumeric(8))).
+ getWAL(info.getEncodedNameAsBytes());
}
HRegion region = HRegion.newHRegion(tableDir,
- effectiveHLog, fs, conf, info, hTableDescriptor, null);
+ effectiveWAL, fs, conf, info, hTableDescriptor, null);
if (initialize) {
- // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
+ // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
// verifying the WALEdits.
region.setSequenceId(region.initialize(null));
}
@@ -4593,25 +4599,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog)
+ final WAL wal)
throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+ return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
}
/**
* Open a Region.
* @param info Info for region to be opened.
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
*
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal,
+ final HTableDescriptor htd, final WAL wal,
final Configuration conf)
throws IOException {
return openHRegion(info, htd, wal, conf, null, null);
@@ -4621,9 +4627,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* Open a Region.
* @param info Info for region to be opened
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4633,7 +4639,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4645,16 +4651,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf)
+ final HTableDescriptor htd, final WAL wal, final Configuration conf)
throws IOException {
return openHRegion(rootDir, info, htd, wal, conf, null, null);
}
@@ -4664,9 +4670,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4675,7 +4681,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4696,15 +4702,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
throws IOException {
return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
}
@@ -4716,9 +4722,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4726,7 +4732,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@@ -4740,9 +4746,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4750,8 +4756,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
- final RegionServerServices rsServices, final CancelableProgressable reporter)
+ final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
+ final WAL wal, final RegionServerServices rsServices,
+ final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
@@ -4772,7 +4779,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
throws IOException {
HRegionFileSystem regionFs = other.getRegionFileSystem();
- HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
+ HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
return r.openHRegion(reporter);
}
@@ -4789,8 +4796,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
- if (log != null && getRegionServerServices() != null) {
- writeRegionOpenMarker(log, openSeqNum);
+ if (wal != null && getRegionServerServices() != null) {
+ writeRegionOpenMarker(wal, openSeqNum);
}
return this;
}
@@ -4812,7 +4819,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
fs.commitDaughterRegion(hri);
// Create the daughter HRegion instance
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
@@ -4827,7 +4834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
final HRegion region_b) throws IOException {
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
this.getTableDesc(), this.rsServices);
r.readRequestsCount.set(this.getReadRequestsCount()
@@ -5174,7 +5181,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -5219,16 +5226,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long txid = 0;
// 8. Append no sync
if (!walEdit.isEmpty()) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
if(walKey == null){
- // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// 9. Release region lock
@@ -5368,7 +5376,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
long mvccNum = 0;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
RowLock rowLock = null;
List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false;
@@ -5473,7 +5481,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
tempMemstore.put(store, kvs);
}
@@ -5501,16 +5509,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
this.sequenceId, true, memstoreCells);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -5585,7 +5594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
RowLock rowLock = null;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false;
@@ -5692,7 +5701,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
if (!kvs.isEmpty()) {
tempMemstore.put(store, kvs);
}
@@ -5726,9 +5735,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdits, getSequenceId(), true, memstoreCells);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
@@ -5736,7 +5746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
} finally {
this.updatesLock.readLock().unlock();
@@ -5942,13 +5952,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
private static void processTable(final FileSystem fs, final Path p,
- final HLog log, final Configuration c,
+ final WALFactory walFactory, final Configuration c,
final boolean majorCompact)
throws IOException {
HRegion region;
// Currently expects tables have one region only.
if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
- region = HRegion.newHRegion(p, log, fs, c,
+ final WAL wal = walFactory.getMetaWAL(
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ region = HRegion.newHRegion(p, wal, fs, c,
HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
} else {
throw new IOException("Not a known catalog table: " + p.toString());
@@ -6248,13 +6260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
private void syncOrDefer(long txid, Durability durability) throws IOException {
if (this.getRegionInfo().isMetaRegion()) {
- this.log.sync(txid);
+ this.wal.sync(txid);
} else {
switch(durability) {
case USE_DEFAULT:
// do what table defaults to
- if (shouldSyncLog()) {
- this.log.sync(txid);
+ if (shouldSyncWAL()) {
+ this.wal.sync(txid);
}
break;
case SKIP_WAL:
@@ -6266,16 +6278,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
case SYNC_WAL:
case FSYNC_WAL:
// sync the WAL edit (SYNC and FSYNC treated the same for now)
- this.log.sync(txid);
+ this.wal.sync(txid);
break;
}
}
}
/**
- * Check whether we should sync the log from the table's durability settings
+ * Check whether we should sync the wal from the table's durability settings
*/
- private boolean shouldSyncLog() {
+ private boolean shouldSyncWAL() {
return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
}
@@ -6329,13 +6341,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final Configuration c = HBaseConfiguration.create();
final FileSystem fs = FileSystem.get(c);
final Path logdir = new Path(c.get("hbase.tmp.dir"));
- final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
+ final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
- final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
+ final Configuration walConf = new Configuration(c);
+ FSUtils.setRootDir(walConf, logdir);
+ final WALFactory wals = new WALFactory(walConf, null, logname);
try {
- processTable(fs, tableDir, log, c, majorCompact);
+ processTable(fs, tableDir, wals, c, majorCompact);
} finally {
- log.close();
+ wals.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
@@ -6496,20 +6510,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
+ * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
* the WALEdit append later.
* @param wal
* @param cells list of Cells inserted into memstore. Those Cells are passed in order to
- * be updated with right mvcc values(their log sequence number)
+ * be updated with right mvcc values(their wal sequence number)
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
- HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
- HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
+ WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
- wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
+ wal.append(getTableDesc(), getRegionInfo(), key,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
@@ -6519,8 +6534,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public void syncWal() throws IOException {
- if(this.log != null) {
- this.log.sync();
+ if(this.wal != null) {
+ this.wal.sync();
}
}