You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/11/18 21:05:03 UTC
[8/8] hbase git commit: HBASE-10378 Refactor write-ahead-log
implementation
HBASE-10378 Refactor write-ahead-log implementation
Incompatible changes called out in release notes on jira.
* Cleaned up references to HLog
* Deprecates HLogKey but maintains it for compatibility
- Moves all Writeable from WALKey to HLogKey
* Adds utility code to CoprocessorHost to help with evolving Coprocessor APIs
* RSRpcServices roll WAL call now requests the non-meta LogRoller roll all logs
- rolls actually happen asynchronously
- deprecated old api (and noted incompatible behavior change)
- modified api in new Admin interface to reflect lack of return values.
* Moved WAL user facing API to "WAL"
- only 1 sync offered
- WALTrailer removed from API
* make provider used by the WALFactory configurable.
* Move all WAL requests to use opaque ids instead of paths
* WALProvider provides API details for implementers and handles creation of WALs.
* Refactor WALActionsListener to have a basic implementation.
* turn MetricsWAL into a WALActionsListener.
* tests that needs FSHLog implementation details use them directly, others just reference provider + factory
- Some tests moved from Large to Medium based on run time.
* pull out wal disabling into its own no-op class
* update region open to delegate to WALFactory
* update performance test tool to allow for multiple regions
* Removed references to meta-specific wals within wal code
- replaced with generic suffixes
- WALFactory maintains a dedicated WALProvider for meta (and so knows about the distinction)
* maintain backwards compat on HLogPrettyPrinter and mark it deprecated.
- made WALPrettyPrinter IA.Private in favor of `bin/hbase wal`
* move WALUtil stuff that's implementation specific to said implementation
- WALUtil now acts as an integration point between the RegionServer and hte WAL code.
Incorporates contributions from v.himanshu.
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f5e05eb8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f5e05eb8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f5e05eb8
Branch: refs/heads/master
Commit: f5e05eb836a807e418ebcf432fb0f481b40277ac
Parents: b97c3da
Author: Sean Busbey <bu...@apache.org>
Authored: Sat Aug 2 13:27:36 2014 -0500
Committer: stack <st...@apache.org>
Committed: Tue Nov 18 12:04:41 2014 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/HBaseAdmin.java | 61 +-
.../regionserver/wal/TestMetricsWALSource.java | 32 +
.../org/apache/hadoop/hbase/io/WALLink.java | 69 +
.../hadoop/hbase/mapreduce/WALInputFormat.java | 287 +++
.../hadoop/hbase/regionserver/HRegion.java | 400 ++--
.../handler/WALSplitterHandler.java | 106 +
.../hbase/regionserver/wal/HLogSplitter.java | 2073 ----------------
.../hadoop/hbase/regionserver/wal/WALUtil.java | 101 +
.../ReplicationWALReaderManager.java | 145 ++
.../hadoop/hbase/wal/DefaultWALProvider.java | 369 +++
.../hadoop/hbase/wal/DisabledWALProvider.java | 213 ++
.../java/org/apache/hadoop/hbase/wal/WAL.java | 263 ++
.../org/apache/hadoop/hbase/wal/WALFactory.java | 426 ++++
.../org/apache/hadoop/hbase/wal/WALKey.java | 553 +++++
.../hadoop/hbase/wal/WALPrettyPrinter.java | 407 ++++
.../apache/hadoop/hbase/wal/WALProvider.java | 83 +
.../apache/hadoop/hbase/wal/WALSplitter.java | 2253 ++++++++++++++++++
.../hbase/mapreduce/TestWALRecordReader.java | 269 +++
.../regionserver/wal/InstrumentedLogWriter.java | 43 +
.../hbase/regionserver/wal/TestFSHLog.java | 479 ++++
.../hbase/regionserver/wal/TestProtobufLog.java | 209 ++
.../TestReplicationWALReaderManager.java | 221 ++
.../apache/hadoop/hbase/wal/FaultyFSLog.java | 76 +
.../hbase/wal/TestDefaultWALProvider.java | 333 +++
.../wal/TestDefaultWALProviderWithHLogKey.java | 34 +
.../apache/hadoop/hbase/wal/TestSecureWAL.java | 138 ++
.../apache/hadoop/hbase/wal/TestWALFactory.java | 721 ++++++
.../hadoop/hbase/wal/TestWALFiltering.java | 154 ++
.../apache/hadoop/hbase/wal/TestWALMethods.java | 178 ++
.../hbase/wal/TestWALReaderOnSecureWAL.java | 218 ++
.../apache/hadoop/hbase/wal/TestWALSplit.java | 1309 ++++++++++
.../hbase/wal/TestWALSplitCompressed.java | 36 +
.../hbase/wal/WALPerformanceEvaluation.java | 567 +++++
.../src/main/ruby/shell/commands/wal_roll.rb | 42 +
34 files changed, 10583 insertions(+), 2285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 3c808e8..e30e592 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -2617,37 +2617,60 @@ public class HBaseAdmin implements Admin {
return getTableDescriptorsByTableName(tableNames);
}
+ private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
+ FailedLogCloseException {
+ AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+ try {
+ return admin.rollWALWriter(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
/**
- * Roll the log writer. That is, start writing log messages to a new file.
+ * Roll the log writer. I.e. when using a file system based write ahead log,
+ * start writing log messages to a new file.
+ *
+ * Note that when talking to a version 1.0+ HBase deployment, the rolling is asynchronous.
+ * This method will return as soon as the roll is requested and the return value will
+ * always be null. Additionally, the named region server may schedule store flushes at the
+ * request of the wal handling the roll request.
+ *
+ * When talking to a 0.98 or older HBase deployment, the rolling is synchronous and the
+ * return value may be either null or a list of encoded region names.
*
* @param serverName
* The servername of the regionserver. A server name is made of host,
* port and startcode. This is mandatory. Here is an example:
* <code> host187.example.com,60020,1289493121758</code>
- * @return If lots of logs, flush the returned regions so next time through
- * we can clean logs. Returns null if nothing to flush. Names are actual
- * region names as returned by {@link HRegionInfo#getEncodedName()}
+ * @return a set of {@link HRegionInfo#getEncodedName()} that would allow the wal to
+ * clean up some underlying files. null if there's nothing to flush.
* @throws IOException if a remote or network exception occurs
* @throws FailedLogCloseException
+ * @deprecated use {@link #rollWALWriter(ServerName)}
*/
- @Override
- public synchronized byte[][] rollHLogWriter(String serverName)
+ @Deprecated
+ public synchronized byte[][] rollHLogWriter(String serverName)
throws IOException, FailedLogCloseException {
ServerName sn = ServerName.valueOf(serverName);
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
- try {
- RollWALWriterResponse response = admin.rollWALWriter(null, request);
- int regionCount = response.getRegionToFlushCount();
- byte[][] regionsToFlush = new byte[regionCount][];
- for (int i = 0; i < regionCount; i++) {
- ByteString region = response.getRegionToFlush(i);
- regionsToFlush[i] = region.toByteArray();
- }
- return regionsToFlush;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ final RollWALWriterResponse response = rollWALWriterImpl(sn);
+ int regionCount = response.getRegionToFlushCount();
+ if (0 == regionCount) {
+ return null;
}
+ byte[][] regionsToFlush = new byte[regionCount][];
+ for (int i = 0; i < regionCount; i++) {
+ ByteString region = response.getRegionToFlush(i);
+ regionsToFlush[i] = region.toByteArray();
+ }
+ return regionsToFlush;
+ }
+
+ @Override
+ public synchronized void rollWALWriter(ServerName serverName)
+ throws IOException, FailedLogCloseException {
+ rollWALWriterImpl(serverName);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
new file mode 100644
index 0000000..5254198
--- /dev/null
+++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWALSource.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.junit.Test;
+
+public class TestMetricsWALSource {
+
+ @Test(expected=RuntimeException.class)
+ public void testGetInstanceNoHadoopCompat() throws Exception {
+ //This should throw an exception because there is no compat lib on the class path.
+ CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
new file mode 100644
index 0000000..fc5bd5d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * WALLink describes a link to a WAL.
+ *
+ * An wal can be in /hbase/.logs/<server>/<wal>
+ * or it can be in /hbase/.oldlogs/<wal>
+ *
+ * The link checks first in the original path,
+ * if it is not present it fallbacks to the archived path.
+ */
+@InterfaceAudience.Private
+public class WALLink extends FileLink {
+ /**
+ * @param conf {@link Configuration} from which to extract specific archive locations
+ * @param serverName Region Server owner of the log
+ * @param logName WAL file name
+ * @throws IOException on unexpected error.
+ */
+ public WALLink(final Configuration conf,
+ final String serverName, final String logName) throws IOException {
+ this(FSUtils.getRootDir(conf), serverName, logName);
+ }
+
+ /**
+ * @param rootDir Path to the root directory where hbase files are stored
+ * @param serverName Region Server owner of the log
+ * @param logName WAL file name
+ */
+ public WALLink(final Path rootDir, final String serverName, final String logName) {
+ final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
+ setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
+ }
+
+ /**
+ * @param originPath Path to the wal in the log directory
+ * @param archivePath Path to the wal in the archived log directory
+ */
+ public WALLink(final Path originPath, final Path archivePath) {
+ setLocations(originPath, archivePath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
new file mode 100644
index 0000000..02fcbba
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
+ */
+@InterfaceAudience.Public
+public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
+ private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
+
+ public static final String START_TIME_KEY = "wal.start.time";
+ public static final String END_TIME_KEY = "wal.end.time";
+
+ /**
+ * {@link InputSplit} for {@link WAL} files. Each split represent
+ * exactly one log file.
+ */
+ static class WALSplit extends InputSplit implements Writable {
+ private String logFileName;
+ private long fileSize;
+ private long startTime;
+ private long endTime;
+
+ /** for serialization */
+ public WALSplit() {}
+
+ /**
+ * Represent an WALSplit, i.e. a single WAL file.
+ * Start- and EndTime are managed by the split, so that WAL files can be
+ * filtered before WALEdits are passed to the mapper(s).
+ * @param logFileName
+ * @param fileSize
+ * @param startTime
+ * @param endTime
+ */
+ public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
+ this.logFileName = logFileName;
+ this.fileSize = fileSize;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return fileSize;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ // TODO: Find the data node with the most blocks for this WAL?
+ return new String[] {};
+ }
+
+ public String getLogFileName() {
+ return logFileName;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ logFileName = in.readUTF();
+ fileSize = in.readLong();
+ startTime = in.readLong();
+ endTime = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(logFileName);
+ out.writeLong(fileSize);
+ out.writeLong(startTime);
+ out.writeLong(endTime);
+ }
+
+ @Override
+ public String toString() {
+ return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
+ }
+ }
+
+ /**
+ * {@link RecordReader} for an {@link WAL} file.
+ * Implementation shared with deprecated HLogInputFormat.
+ */
+ static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
+ private Reader reader = null;
+ // visible until we can remove the deprecated HLogInputFormat
+ Entry currentEntry = new Entry();
+ private long startTime;
+ private long endTime;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ WALSplit hsplit = (WALSplit)split;
+ Path logFile = new Path(hsplit.getLogFileName());
+ Configuration conf = context.getConfiguration();
+ LOG.info("Opening reader for "+split);
+ try {
+ this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
+ } catch (EOFException x) {
+ LOG.info("Ignoring corrupted WAL file: " + logFile
+ + " (This is normal when a RegionServer crashed.)");
+ this.reader = null;
+ }
+ this.startTime = hsplit.getStartTime();
+ this.endTime = hsplit.getEndTime();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (reader == null) return false;
+
+ Entry temp;
+ long i = -1;
+ do {
+ // skip older entries
+ try {
+ temp = reader.next(currentEntry);
+ i++;
+ } catch (EOFException x) {
+ LOG.info("Corrupted entry detected. Ignoring the rest of the file."
+ + " (This is normal when a RegionServer crashed.)");
+ return false;
+ }
+ }
+ while(temp != null && temp.getKey().getWriteTime() < startTime);
+
+ if (temp == null) {
+ if (i > 0) LOG.info("Skipped " + i + " entries.");
+ LOG.info("Reached end of file.");
+ return false;
+ } else if (i > 0) {
+ LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
+ }
+ boolean res = temp.getKey().getWriteTime() <= endTime;
+ if (!res) {
+ LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
+ }
+ return res;
+ }
+
+ @Override
+ public WALEdit getCurrentValue() throws IOException, InterruptedException {
+ return currentEntry.getEdit();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // N/A depends on total number of entries, which is unknown
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Closing reader");
+ if (reader != null) this.reader.close();
+ }
+ }
+
+ /**
+ * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
+ * need to support HLogInputFormat.
+ */
+ static class WALKeyRecordReader extends WALRecordReader<WALKey> {
+ @Override
+ public WALKey getCurrentKey() throws IOException, InterruptedException {
+ return currentEntry.getKey();
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
+ InterruptedException {
+ return getSplits(context, START_TIME_KEY, END_TIME_KEY);
+ }
+
+ /**
+ * implementation shared with deprecated HLogInputFormat
+ */
+ List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
+
+ long startTime = conf.getLong(startKey, Long.MIN_VALUE);
+ long endTime = conf.getLong(endKey, Long.MAX_VALUE);
+
+ FileSystem fs = inputDir.getFileSystem(conf);
+ List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
+
+ List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
+ for (FileStatus file : files) {
+ splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
+ }
+ return splits;
+ }
+
+ private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
+ throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ LOG.debug("Scanning " + dir.toString() + " for WAL files");
+
+ FileStatus[] files = fs.listStatus(dir);
+ if (files == null) return Collections.emptyList();
+ for (FileStatus file : files) {
+ if (file.isDirectory()) {
+ // recurse into sub directories
+ result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
+ } else {
+ String name = file.getPath().toString();
+ int idx = name.lastIndexOf('.');
+ if (idx > 0) {
+ try {
+ long fileStartTime = Long.parseLong(name.substring(idx+1));
+ if (fileStartTime <= endTime) {
+ LOG.info("Found: " + name);
+ result.add(file);
+ }
+ } catch (NumberFormatException x) {
+ idx = 0;
+ }
+ }
+ if (idx == 0) {
+ LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new WALKeyRecordReader();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d7f83c9..d6a0731 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -127,12 +128,15 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Flus
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -227,13 +231,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
protected volatile long lastFlushSeqId = -1L;
/**
- * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
+ * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
* Its default value is -1L. This default is used as a marker to indicate
* that the region hasn't opened yet. Once it is opened, it is set to the derived
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
*
- * <p>Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
+ * <p>Control of this sequence is handed off to the WAL implementation. It is responsible
* for tagging edits with the correct sequence id since it is responsible for getting the
* edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
* OUTSIDE OF THE WAL. The value you get will not be what you think it is.
@@ -289,7 +293,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
- private final HLog log;
+ private final WAL wal;
private final HRegionFileSystem fs;
protected final Configuration conf;
private final Configuration baseConf;
@@ -337,7 +341,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
//
// Context: During replay we want to ensure that we do not lose any data. So, we
- // have to be conservative in how we replay logs. For each store, we calculate
+ // have to be conservative in how we replay wals. For each store, we calculate
// the maxSeqId up to which the store was flushed. And, skip the edits which
// are equal to or lower than maxSeqId for each store.
// The following map is populated when opening the region
@@ -535,11 +539,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -550,11 +553,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rsServices reference to {@link RegionServerServices} or null
*/
@Deprecated
- public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
+ public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, final RegionServerServices rsServices) {
this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
- log, confParam, htd, rsServices);
+ wal, confParam, htd, rsServices);
}
/**
@@ -563,18 +566,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
*
* @param fs is the filesystem.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous wal file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param confParam is global configuration settings.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
*/
- public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
+ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
final HTableDescriptor htd, final RegionServerServices rsServices) {
if (htd == null) {
throw new IllegalArgumentException("Need table descriptor");
@@ -585,7 +587,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.comparator = fs.getRegionInfo().getComparator();
- this.log = log;
+ this.wal = wal;
this.fs = fs;
// 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
@@ -761,14 +763,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.splitPolicy = RegionSplitPolicy.create(this, conf);
this.lastFlushTime = EnvironmentEdgeManager.currentTime();
- // Use maximum of log sequenceid or that which was found in stores
+ // Use maximum of wal sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
if (this.isRecovering) {
// In distributedLogReplay mode, we don't know the last change sequence number because region
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers
- nextSeqid = HLogUtil.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
+ nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(),
this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000));
}
@@ -861,7 +863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return maxSeqId;
}
- private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
+ private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
@@ -876,11 +878,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
getSequenceId());
}
- private void writeRegionCloseMarker(HLog log) throws IOException {
+ private void writeRegionCloseMarker(WAL wal) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
@@ -895,7 +897,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
getRegionServerServices().getServerName(), storeFiles);
- HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
+ WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
getSequenceId());
}
@@ -1032,7 +1034,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
boolean wasRecovering = this.isRecovering;
this.isRecovering = newState;
if (wasRecovering && !isRecovering) {
- // Call only when log replay is over.
+ // Call only when wal replay is over.
coprocessorHost.postLogReplay();
}
}
@@ -1278,8 +1280,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Writing region close event to WAL");
- if (!abort && log != null && getRegionServerServices() != null) {
- writeRegionCloseMarker(log);
+ if (!abort && wal != null && getRegionServerServices() != null) {
+ writeRegionCloseMarker(wal);
}
this.closed.set(true);
@@ -1404,9 +1406,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return this.htableDescriptor;
}
- /** @return HLog in use for this region */
- public HLog getLog() {
- return this.log;
+ /** @return WAL in use for this region */
+ public WAL getWAL() {
+ return this.wal;
}
/**
@@ -1612,7 +1614,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return true if the region needs compacting
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
public FlushResult flushcache() throws IOException {
@@ -1710,7 +1712,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
- * memstore, all of which have also been written to the log. We need to write those updates in the
+ * memstore, all of which have also been written to the wal. We need to write those updates in the
* memstore out to disk, while being able to process reads/writes as much as possible during the
* flush operation.
* <p>This method may block for some time. Every time you call it, we up the regions
@@ -1721,24 +1723,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return object describing the flush's state
*
* @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
protected FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
- return internalFlushcache(this.log, -1, status);
+ return internalFlushcache(this.wal, -1, status);
}
/**
- * @param wal Null if we're NOT to go via hlog/wal.
+ * @param wal Null if we're NOT to go via wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
* @return object describing the flush's state
* @throws IOException
* @see #internalFlushcache(MonitoredTask)
*/
protected FlushResult internalFlushcache(
- final HLog wal, final long myseqid, MonitoredTask status)
- throws IOException {
+ final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1753,7 +1754,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
if (this.memstoreSize.get() <= 0) {
// Presume that if there are still no edits in the memstore, then there are no edits for
- // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
+ // this region out in the WAL subsystem so no need to do any trickery clearing out
// edits in the WAL system. Up the sequence number so the resulting flush id is for
// sure just beyond the last appended region edit (useful as a marker when bulk loading,
// etc.)
@@ -1835,7 +1836,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (wal != null) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
}
@@ -1849,7 +1850,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -1874,12 +1875,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
- LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
+ LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
+ StringUtils.stringifyException(ioe));
}
}
- // wait for all in-progress transactions to commit to HLog before
+ // wait for all in-progress transactions to commit to WAL before
// we can start the flush. This prevents
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
@@ -1899,8 +1900,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Any failure from here on out will be catastrophic requiring server
- // restart so hlog content can be replayed and put back into the memstore.
- // Otherwise, the snapshot content while backed up in the hlog, it will not
+ // restart so wal content can be replayed and put back into the memstore.
+ // Otherwise, the snapshot content while backed up in the wal, it will not
// be part of the current running servers state.
boolean compactionRequested = false;
try {
@@ -1933,12 +1934,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memstore.
+ // The wal needs to be replayed so its content is restored to memstore.
// Currently, only a server restart will do this.
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
@@ -1947,7 +1948,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
- HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
+ WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2001,8 +2002,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
- private long getNextSequenceId(final HLog wal) throws IOException {
- HLogKey key = this.appendNoSyncNoAppend(wal, null);
+ private long getNextSequenceId(final WAL wal) throws IOException {
+ WALKey key = this.appendEmptyEdit(wal, null);
return key.getSequenceId();
}
@@ -2334,7 +2335,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
+ private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
@@ -2402,7 +2403,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
+ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@@ -2517,7 +2518,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
int lastIndexExclusive = firstIndex;
boolean success = false;
int noOfPuts = 0, noOfDeletes = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
try {
// ------------------------------------
@@ -2660,7 +2661,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// ------------------------------------
// STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore
- // first without updating the HLog because we do not roll
+ // first without updating the WAL because we do not roll
// forward the memstore MVCC. The MVCC will be moved up when
// the complete operation is done. These changes are not yet
// visible to scanners till we update the MVCC. The MVCC is
@@ -2709,10 +2710,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new IOException("Multiple nonces per batch and not in replay");
}
// txid should always increase, so having the one from the last call is ok.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
walKey = null;
@@ -2736,18 +2738,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
if(isInReplay) {
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// -------------------------------
@@ -3149,7 +3152,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Add updates first to the hlog and then add values to memstore.
+ * Add updates first to the wal and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
* @throws IOException
@@ -3276,7 +3279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Append the given map of family->edits to a WALEdit data structure.
- * This does not write to the HLog itself.
+ * This does not write to the WAL itself.
* @param familyMap map of family->edits
* @param walEdit the destination entry to append into
*/
@@ -3318,11 +3321,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Read the edits log put under this region by wal log splitting process. Put
+ * Read the edits put under this region by wal splitting process. Put
* the recovered edits back up into this region.
*
- * <p>We can ignore any log message that has a sequence ID that's equal to or
- * lower than minSeqId. (Because we know such log messages are already
+ * <p>We can ignore any wal message that has a sequence ID that's equal to or
+ * lower than minSeqId. (Because we know such messages are already
* reflected in the HFiles.)
*
* <p>While this is running we are putting pressure on memory yet we are
@@ -3331,15 +3334,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that if we're up against global memory limits, we'll not be flagged to flush
* because we are not online. We can't be flushed by usual mechanisms anyways;
* we're not yet online so our relative sequenceids are not yet aligned with
- * HLog sequenceids -- not till we come up online, post processing of split
+ * WAL sequenceids -- not till we come up online, post processing of split
* edits.
*
* <p>But to help relieve memory pressure, at least manage our own heap size
* flushing if are in excess of per-region limits. Flushing, though, we have
- * to be careful and avoid using the regionserver/hlog sequenceid. Its running
+ * to be careful and avoid using the regionserver/wal sequenceid. Its running
* on a different line to whats going on in here in this region context so if we
* crashed replaying these edits, but in the midst had a flush that used the
- * regionserver log with a sequenceid in excess of whats going on in here
+ * regionserver wal with a sequenceid in excess of whats going on in here
* in this region and with its split editlogs, then we could miss edits the
* next time we go to recover. So, we have to flush inline, using seqids that
* make sense in a this single region context only -- until we online.
@@ -3364,7 +3367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long seqid = minSeqIdForTheRegion;
FileSystem fs = this.fs.getFileSystem();
- NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
+ NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
@@ -3384,7 +3387,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
- String msg = "Maximum sequenceid for this log is " + maxSeqId
+ String msg = "Maximum sequenceid for this wal is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
@@ -3407,7 +3410,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@@ -3438,7 +3441,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/*
* @param edits File of recovered edits.
- * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log
+ * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
* must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
@@ -3453,17 +3456,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
MonitoredTask status = TaskMonitor.get().createStatus(msg);
FileSystem fs = this.fs.getFileSystem();
- status.setStatus("Opening logs");
- HLog.Reader reader = null;
+ status.setStatus("Opening recovered edits");
+ WAL.Reader reader = null;
try {
- reader = HLogFactory.createReader(fs, edits, conf);
+ reader = WALFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
- HLog.Entry entry;
+ WAL.Entry entry;
Store store = null;
boolean reported_once = false;
ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
@@ -3477,7 +3480,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long lastReport = EnvironmentEdgeManager.currentTime();
while ((entry = reader.next()) != null) {
- HLogKey key = entry.getKey();
+ WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (ng != null) { // some test, or nonces disabled
@@ -3512,7 +3515,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
- // if bypass this log entry, ignore it ...
+ // if bypass this wal entry, ignore it ...
continue;
}
}
@@ -3571,9 +3574,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
} catch (EOFException eof) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "Encountered EOF. Most likely due to Master failure during " +
- "log splitting, so we have this data in another edit. " +
+ "wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, eof);
status.abort(msg);
@@ -3581,7 +3584,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
- Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
@@ -4400,11 +4403,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* {@link HConstants#REGION_IMPL} configuration property.
* @param tableDir qualified path of directory where region should be located,
* usually the table directory.
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
+ * @param wal The WAL is the outbound log for any updates to the HRegion
+ * The wal file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
+ * appropriate wal info for this HRegion. If there is a previous file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
@@ -4414,7 +4416,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param htd the table descriptor
* @return the new instance
*/
- static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
+ static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
RegionServerServices rsServices) {
try {
@@ -4423,11 +4425,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
(Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
Constructor<? extends HRegion> c =
- regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
+ regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
Configuration.class, HRegionInfo.class, HTableDescriptor.class,
RegionServerServices.class);
- return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
+ return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
@@ -4437,11 +4439,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
/**
* Convenience method creating new HRegions. Used by createTable and by the
* bootstrap code in the HMaster constructor.
- * Note, this method creates an {@link HLog} for the created region. It
- * needs to be closed explicitly. Use {@link HRegion#getLog()} to get
+ * Note, this method creates an {@link WAL} for the created region. It
+ * needs to be closed explicitly. Use {@link HRegion#getWAL()} to get
* access. <b>When done with a region created using this method, you will
- * need to explicitly close the {@link HLog} it created too; it will not be
- * done for you. Not closing the log will leave at least a daemon thread
+ * need to explicitly close the {@link WAL} it created too; it will not be
+ * done for you. Not closing the wal will leave at least a daemon thread
* running.</b> Call {@link #closeHRegion(HRegion)} and it will do
* necessary cleanup for you.
* @param info Info for region to create.
@@ -4460,27 +4462,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* This will do the necessary cleanup a call to
* {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
* requires. This method will close the region and then close its
- * associated {@link HLog} file. You use it if you call the other createHRegion,
- * the one that takes an {@link HLog} instance but don't be surprised by the
- * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
+ * associated {@link WAL} file. You can still use it if you call the other createHRegion,
+ * the one that takes an {@link WAL} instance but don't be surprised by the
+ * call to the {@link WAL#close()} on the {@link WAL} the
* HRegion was carrying.
* @throws IOException
*/
public static void closeHRegion(final HRegion r) throws IOException {
if (r == null) return;
r.close();
- if (r.getLog() == null) return;
- r.getLog().closeAndDelete();
+ if (r.getWAL() == null) return;
+ r.getWAL().close();
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed explicitly.
- * Use {@link HRegion#getLog()} to get access.
+ * The {@link WAL} for the created region needs to be closed explicitly.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
* @return new HRegion
*
@@ -4489,72 +4491,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
+ final WAL wal,
final boolean initialize)
throws IOException {
return createHRegion(info, rootDir, conf, hTableDescriptor,
- hlog, initialize, false);
+ wal, initialize, false);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
- return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+ return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
+ ignoreWAL);
}
/**
* Convenience method creating new HRegions. Used by createTable.
- * The {@link HLog} for the created region needs to be closed
+ * The {@link WAL} for the created region needs to be closed
* explicitly, if it is not null.
- * Use {@link HRegion#getLog()} to get access.
+ * Use {@link HRegion#getWAL()} to get access.
*
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
* @param tableDir table directory
- * @param hlog shared HLog
+ * @param wal shared WAL
* @param initialize - true to initialize the region
- * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @param ignoreWAL - true to skip generate new wal if it is null, mostly for createTable
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog,
- final boolean initialize, final boolean ignoreHLog)
+ final WAL wal,
+ final boolean initialize, final boolean ignoreWAL)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
FileSystem fs = FileSystem.get(conf);
- HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
- HLog effectiveHLog = hlog;
- if (hlog == null && !ignoreHLog) {
- effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
- HConstants.HREGION_LOGDIR_NAME, conf);
+ HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
+ WAL effectiveWAL = wal;
+ if (wal == null && !ignoreWAL) {
+ // TODO HBASE-11983 There'll be no roller for this wal?
+ effectiveWAL = (new WALFactory(conf,
+ Collections.<WALActionsListener>singletonList(new MetricsWAL()),
+ "hregion-" + RandomStringUtils.random(8))).getWAL(info.getEncodedNameAsBytes());
}
HRegion region = HRegion.newHRegion(tableDir,
- effectiveHLog, fs, conf, info, hTableDescriptor, null);
+ effectiveWAL, fs, conf, info, hTableDescriptor, null);
if (initialize) {
- // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
+ // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
// verifying the WALEdits.
region.setSequenceId(region.initialize(null));
}
@@ -4564,25 +4569,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
- final HLog hlog)
+ final WAL wal)
throws IOException {
- return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+ return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
}
/**
* Open a Region.
* @param info Info for region to be opened.
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
*
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal,
+ final HTableDescriptor htd, final WAL wal,
final Configuration conf)
throws IOException {
return openHRegion(info, htd, wal, conf, null, null);
@@ -4592,9 +4597,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* Open a Region.
* @param info Info for region to be opened
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4604,7 +4609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4616,16 +4621,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf)
+ final HTableDescriptor htd, final WAL wal, final Configuration conf)
throws IOException {
return openHRegion(rootDir, info, htd, wal, conf, null, null);
}
@@ -4635,9 +4640,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param conf The Configuration object to use.
* @param rsServices An interface we can request flushes against.
@@ -4646,7 +4651,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
- final HTableDescriptor htd, final HLog wal, final Configuration conf,
+ final HTableDescriptor htd, final WAL wal, final Configuration conf,
final RegionServerServices rsServices,
final CancelableProgressable reporter)
throws IOException {
@@ -4667,15 +4672,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @return new HRegion
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
throws IOException {
return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
}
@@ -4687,9 +4692,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4697,7 +4702,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+ final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@@ -4711,9 +4716,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
- * @param wal HLog for region to use. This method will call
- * HLog#setSequenceNumber(long) passing the result of the call to
- * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * @param wal WAL for region to use. This method will call
+ * WAL#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the wal id is properly kept
* up. HRegionStore does this every time it opens a new region.
* @param rsServices An interface we can request flushes against.
* @param reporter An interface we can report progress against.
@@ -4721,8 +4726,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
- final RegionServerServices rsServices, final CancelableProgressable reporter)
+ final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
+ final WAL wal, final RegionServerServices rsServices,
+ final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
@@ -4743,7 +4749,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
throws IOException {
HRegionFileSystem regionFs = other.getRegionFileSystem();
- HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
+ HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
return r.openHRegion(reporter);
}
@@ -4760,8 +4766,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
- if (log != null && getRegionServerServices() != null) {
- writeRegionOpenMarker(log, openSeqNum);
+ if (wal != null && getRegionServerServices() != null) {
+ writeRegionOpenMarker(wal, openSeqNum);
}
return this;
}
@@ -4783,7 +4789,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
fs.commitDaughterRegion(hri);
// Create the daughter HRegion instance
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
@@ -4798,7 +4804,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
final HRegion region_b) throws IOException {
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
+ HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
this.getTableDesc(), this.rsServices);
r.readRequestsCount.set(this.getReadRequestsCount()
@@ -5144,7 +5150,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
- HLogKey walKey = null;
+ WALKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -5189,16 +5195,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
long txid = 0;
// 8. Append no sync
if (!walEdit.isEmpty()) {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
if(walKey == null){
- // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
// to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
// 9. Release region lock
@@ -5338,7 +5345,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
long mvccNum = 0;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
RowLock rowLock = null;
List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false;
@@ -5442,7 +5449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
tempMemstore.put(store, kvs);
}
@@ -5470,16 +5477,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
this.sequenceId, true, memstoreCells);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -5554,7 +5562,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
this.writeRequestsCount.increment();
RowLock rowLock = null;
WriteEntry w = null;
- HLogKey walKey = null;
+ WALKey walKey = null;
long mvccNum = 0;
List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false;
@@ -5661,7 +5669,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
- //store the kvs to the temporary memstore before writing HLog
+ //store the kvs to the temporary memstore before writing WAL
if (!kvs.isEmpty()) {
tempMemstore.put(store, kvs);
}
@@ -5695,9 +5703,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Using default cluster id, as this can only happen in the originating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
- txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
+ txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdits, getSequenceId(), true, memstoreCells);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
@@ -5705,7 +5714,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
if(walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
- walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+ walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
} finally {
this.updatesLock.readLock().unlock();
@@ -5911,14 +5920,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
private static void processTable(final FileSystem fs, final Path p,
- final HLog log, final Configuration c,
+ final WALFactory walFactory, final Configuration c,
final boolean majorCompact)
throws IOException {
HRegion region;
FSTableDescriptors fst = new FSTableDescriptors(c);
// Currently expects tables have one region only.
if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
- region = HRegion.newHRegion(p, log, fs, c,
+ final WAL wal = walFactory.getMetaWAL(
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ region = HRegion.newHRegion(p, wal, fs, c,
HRegionInfo.FIRST_META_REGIONINFO,
fst.get(TableName.META_TABLE_NAME), null);
} else {
@@ -6219,13 +6230,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
private void syncOrDefer(long txid, Durability durability) throws IOException {
if (this.getRegionInfo().isMetaRegion()) {
- this.log.sync(txid);
+ this.wal.sync(txid);
} else {
switch(durability) {
case USE_DEFAULT:
// do what table defaults to
- if (shouldSyncLog()) {
- this.log.sync(txid);
+ if (shouldSyncWAL()) {
+ this.wal.sync(txid);
}
break;
case SKIP_WAL:
@@ -6237,16 +6248,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
case SYNC_WAL:
case FSYNC_WAL:
// sync the WAL edit (SYNC and FSYNC treated the same for now)
- this.log.sync(txid);
+ this.wal.sync(txid);
break;
}
}
}
/**
- * Check whether we should sync the log from the table's durability settings
+ * Check whether we should sync the wal from the table's durability settings
*/
- private boolean shouldSyncLog() {
+ private boolean shouldSyncWAL() {
return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
}
@@ -6300,13 +6311,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
final Configuration c = HBaseConfiguration.create();
final FileSystem fs = FileSystem.get(c);
final Path logdir = new Path(c.get("hbase.tmp.dir"));
- final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
+ final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
- final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
+ final Configuration walConf = new Configuration(c);
+ FSUtils.setRootDir(walConf, logdir);
+ final WALFactory wals = new WALFactory(walConf, null, logname);
try {
- processTable(fs, tableDir, log, c, majorCompact);
+ processTable(fs, tableDir, wals, c, majorCompact);
} finally {
- log.close();
+ wals.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
@@ -6474,20 +6487,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
- * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
+ * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
* the WALEdit append later.
* @param wal
* @param cells list of Cells inserted into memstore. Those Cells are passed in order to
- * be updated with right mvcc values(their log sequence number)
+ * be updated with right mvcc values(their wal sequence number)
* @return Return the key used appending with no sync and no append.
* @throws IOException
*/
- private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
- HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
- HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
+ WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends.
- wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
+ wal.append(getTableDesc(), getRegionInfo(), key,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
@@ -6497,8 +6511,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @throws IOException
*/
public void syncWal() throws IOException {
- if(this.log != null) {
- this.log.sync();
+ if(this.wal != null) {
+ this.wal.sync();
}
}