You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:53 UTC
[14/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
deleted file mode 100644
index d2bbd27..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import com.google.protobuf.TextFormat;
-
-@InterfaceAudience.Private
-public class HLogUtil {
- static final Log LOG = LogFactory.getLog(HLogUtil.class);
-
- /**
- * Pattern used to validate a HLog file name
- */
- private static final Pattern pattern =
- Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
-
- /**
- * @param filename
- * name of the file to validate
- * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
- * otherwise
- */
- public static boolean validateHLogFilename(String filename) {
- return pattern.matcher(filename).matches();
- }
-
- /**
- * Construct the HLog directory name
- *
- * @param serverName
- * Server name formatted as described in {@link ServerName}
- * @return the relative HLog directory name, e.g.
- * <code>.logs/1.example.org,60030,12345</code> if
- * <code>serverName</code> passed is
- * <code>1.example.org,60030,12345</code>
- */
- public static String getHLogDirectoryName(final String serverName) {
- StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
- dirName.append("/");
- dirName.append(serverName);
- return dirName.toString();
- }
-
- /**
- * @param regiondir
- * This regions directory in the filesystem.
- * @return The directory that holds recovered edits files for the region
- * <code>regiondir</code>
- */
- public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
- return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
- }
-
- /**
- * Move aside a bad edits file.
- *
- * @param fs
- * @param edits
- * Edits file to move aside.
- * @return The name of the moved aside file.
- * @throws IOException
- */
- public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
- throws IOException {
- Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
- + System.currentTimeMillis());
- if (!fs.rename(edits, moveAsideName)) {
- LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
- }
- return moveAsideName;
- }
-
- /**
- * @param path
- * - the path to analyze. Expected format, if it's in hlog directory:
- * / [base directory for hbase] / hbase / .logs / ServerName /
- * logfile
- * @return null if it's not a log file. Returns the ServerName of the region
- * server that created this log file otherwise.
- */
- public static ServerName getServerNameFromHLogDirectoryName(
- Configuration conf, String path) throws IOException {
- if (path == null
- || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
- return null;
- }
-
- if (conf == null) {
- throw new IllegalArgumentException("parameter conf must be set");
- }
-
- final String rootDir = conf.get(HConstants.HBASE_DIR);
- if (rootDir == null || rootDir.isEmpty()) {
- throw new IllegalArgumentException(HConstants.HBASE_DIR
- + " key not found in conf.");
- }
-
- final StringBuilder startPathSB = new StringBuilder(rootDir);
- if (!rootDir.endsWith("/"))
- startPathSB.append('/');
- startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
- if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
- startPathSB.append('/');
- final String startPath = startPathSB.toString();
-
- String fullPath;
- try {
- fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
- } catch (IllegalArgumentException e) {
- LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
- return null;
- }
-
- if (!fullPath.startsWith(startPath)) {
- return null;
- }
-
- final String serverNameAndFile = fullPath.substring(startPath.length());
-
- if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
- // Either it's a file (not a directory) or it's not a ServerName format
- return null;
- }
-
- Path p = new Path(path);
- return getServerNameFromHLogDirectoryName(p);
- }
-
- /**
- * This function returns region server name from a log file name which is in either format:
- * hdfs://<name node>/hbase/.logs/<server name>-splitting/... or hdfs://<name
- * node>/hbase/.logs/<server name>/...
- * @param logFile
- * @return null if the passed in logFile isn't a valid HLog file path
- */
- public static ServerName getServerNameFromHLogDirectoryName(Path logFile) {
- Path logDir = logFile.getParent();
- String logDirName = logDir.getName();
- if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
- logDir = logFile;
- logDirName = logDir.getName();
- }
- ServerName serverName = null;
- if (logDirName.endsWith(HLog.SPLITTING_EXT)) {
- logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length());
- }
- try {
- serverName = ServerName.parseServerName(logDirName);
- } catch (IllegalArgumentException ex) {
- serverName = null;
- LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
- }
- if (serverName != null && serverName.getStartcode() < 0) {
- LOG.warn("Invalid log file path=" + logFile);
- return null;
- }
- return serverName;
- }
-
- /**
- * Returns sorted set of edit files made by wal-log splitter, excluding files
- * with '.temp' suffix.
- *
- * @param fs
- * @param regiondir
- * @return Files in passed <code>regiondir</code> as a sorted set.
- * @throws IOException
- */
- public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
- final Path regiondir) throws IOException {
- NavigableSet<Path> filesSorted = new TreeSet<Path>();
- Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(editsdir))
- return filesSorted;
- FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- boolean result = false;
- try {
- // Return files and only files that match the editfile names pattern.
- // There can be other files in this directory other than edit files.
- // In particular, on error, we'll move aside the bad edit file giving
- // it a timestamp suffix. See moveAsideBadEditsFile.
- Matcher m = HLog.EDITFILES_NAME_PATTERN.matcher(p.getName());
- result = fs.isFile(p) && m.matches();
- // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
- // because it means splithlog thread is writting this file.
- if (p.getName().endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
- result = false;
- }
- } catch (IOException e) {
- LOG.warn("Failed isFile check on " + p);
- }
- return result;
- }
- });
- if (files == null)
- return filesSorted;
- for (FileStatus status : files) {
- filesSorted.add(status.getPath());
- }
- return filesSorted;
- }
-
- public static boolean isMetaFile(Path p) {
- return isMetaFile(p.getName());
- }
-
- public static boolean isMetaFile(String p) {
- if (p != null && p.endsWith(HLog.META_HLOG_FILE_EXTN)) {
- return true;
- }
- return false;
- }
-
- /**
- * Write the marker that a compaction has succeeded and is about to be committed.
- * This provides info to the HMaster to allow it to recover the compaction if
- * this regionserver dies in the middle (This part is not yet implemented). It also prevents
- * the compaction from finishing if this regionserver has already lost its lease on the log.
- * @param sequenceId Used by HLog to get sequence Id for the waledit.
- */
- public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
- final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
- TableName tn = TableName.valueOf(c.getTableName().toByteArray());
- HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
- log.sync();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
- }
- }
-
- /**
- * Write a flush marker indicating a start / abort or a complete of a region flush
- */
- public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
- final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
- TableName tn = TableName.valueOf(f.getTableName().toByteArray());
- HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null);
- if (sync) log.sync(trx);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
- }
- return trx;
- }
-
- /**
- * Write a region open marker indicating that the region is opened
- */
- public static long writeRegionEventMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
- final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
- TableName tn = TableName.valueOf(r.getTableName().toByteArray());
- HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
- long trx = log.appendNoSync(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
- sequenceId, false, null);
- log.sync(trx);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
- }
- return trx;
- }
-
- /**
- * Create a file with name as region open sequence id
- *
- * @param fs
- * @param regiondir
- * @param newSeqId
- * @param saftyBumper
- * @return long new sequence Id value
- * @throws IOException
- */
- public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir,
- long newSeqId, long saftyBumper) throws IOException {
-
- Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
- long maxSeqId = 0;
- FileStatus[] files = null;
- if (fs.exists(editsdir)) {
- files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) {
- return true;
- }
- return false;
- }
- });
- if (files != null) {
- for (FileStatus status : files) {
- String fileName = status.getPath().getName();
- try {
- Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
- - HLog.SEQUENCE_ID_FILE_SUFFIX.length()));
- maxSeqId = Math.max(tmpSeqId, maxSeqId);
- } catch (NumberFormatException ex) {
- LOG.warn("Invalid SeqId File Name=" + fileName);
- }
- }
- }
- }
- if (maxSeqId > newSeqId) {
- newSeqId = maxSeqId;
- }
- newSeqId += saftyBumper; // bump up SeqId
-
- // write a new seqId file
- Path newSeqIdFile = new Path(editsdir, newSeqId + HLog.SEQUENCE_ID_FILE_SUFFIX);
- if (!fs.createNewFile(newSeqIdFile)) {
- throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
- }
- // remove old ones
- if(files != null) {
- for (FileStatus status : files) {
- fs.delete(status.getPath(), false);
- }
- }
- return newSeqId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index cf4b7a6..ad549f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.util.StringUtils;
* single function call and turn it into multiple manipulations of the hadoop metrics system.
*/
@InterfaceAudience.Private
-public class MetricsWAL {
+public class MetricsWAL extends WALActionsListener.Base {
static final Log LOG = LogFactory.getLog(MetricsWAL.class);
private final MetricsWALSource source;
@@ -40,19 +40,20 @@ public class MetricsWAL {
source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
}
- public void finishSync(long time) {
- source.incrementSyncTime(time);
+ @Override
+ public void postSync(final long timeInNanos, final int handlerSyncs) {
+ source.incrementSyncTime(timeInNanos/1000000l);
}
- public void finishAppend(long time, long size) {
-
+ @Override
+ public void postAppend(final long size, final long time) {
source.incrementAppendCount();
source.incrementAppendTime(time);
source.incrementAppendSize(size);
if (time > 1000) {
source.incrementSlowAppendCount();
- LOG.warn(String.format("%s took %d ms appending an edit to hlog; len~=%s",
+ LOG.warn(String.format("%s took %d ms appending an edit to wal; len~=%s",
Thread.currentThread().getName(),
time,
StringUtils.humanReadableInt(size)));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 39f1d9f..285f69b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -31,6 +31,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -58,17 +61,32 @@ import com.google.protobuf.InvalidProtocolBufferException;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
public class ProtobufLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
- static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
- static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+ // public for WALFactory until we move everything to o.a.h.h.wal
+ @InterfaceAudience.Private
+ public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
+ // public for TestWALSplit
+ @InterfaceAudience.Private
+ public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+ /**
+ * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
+ * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
+ */
+ static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
+ static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
+
protected FSDataInputStream inputStream;
protected Codec.Decoder cellDecoder;
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
protected boolean hasTagCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
- // in the hlog, the inputstream's position is equal to walEditsStopOffset.
+ // in the wal, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
private boolean trailerPresent;
+ protected WALTrailer trailer;
+ // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+ // than this size, it is written/read respectively, with a WARN message in the log.
+ protected int trailerWarnSize;
private static List<String> writerClsNames = new ArrayList<String>();
static {
writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
@@ -121,6 +139,13 @@ public class ProtobufLogReader extends ReaderBase {
}
@Override
+ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
+ throws IOException {
+ this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
+ super.init(fs, path, conf, stream);
+ }
+
+ @Override
protected String initReader(FSDataInputStream stream) throws IOException {
return initInternal(stream, true);
}
@@ -268,7 +293,7 @@ public class ProtobufLogReader extends ReaderBase {
}
@Override
- protected boolean readNext(HLog.Entry entry) throws IOException {
+ protected boolean readNext(Entry entry) throws IOException {
while (true) {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
@@ -332,7 +357,7 @@ public class ProtobufLogReader extends ReaderBase {
initCause(realEofEx != null ? realEofEx : ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
- LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
throw new EOFException("Read WALTrailer while reading WALEdits");
@@ -370,11 +395,6 @@ public class ProtobufLogReader extends ReaderBase {
}
@Override
- public WALTrailer getWALTrailer() {
- return trailer;
- }
-
- @Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index fe2eac9..ca80e4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
/**
* Writer for protobuf-based WAL.
@@ -77,8 +81,7 @@ public class ProtobufLogWriter extends WriterBase {
super.init(fs, path, conf, overwritable);
assert this.output == null;
boolean doCompress = initializeCompressionContext(conf, path);
- this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
- HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
+ this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = FSUtils.getDefaultBufferSize(fs);
short replication = (short)conf.getInt(
"hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
@@ -110,7 +113,7 @@ public class ProtobufLogWriter extends WriterBase {
}
@Override
- public void append(HLog.Entry entry) throws IOException {
+ public void append(Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
entry.getKey().getBuilder(compressor).
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
@@ -134,7 +137,7 @@ public class ProtobufLogWriter extends WriterBase {
}
}
- protected WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
+ WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
return builder.build();
}
@@ -188,8 +191,7 @@ public class ProtobufLogWriter extends WriterBase {
return this.output;
}
- @Override
- public void setWALTrailer(WALTrailer walTrailer) {
+ void setWALTrailer(WALTrailer walTrailer) {
this.trailer = walTrailer;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 7fe5a81..5f1e904 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -31,21 +31,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-public abstract class ReaderBase implements HLog.Reader {
+public abstract class ReaderBase implements DefaultWALProvider.Reader {
private static final Log LOG = LogFactory.getLog(ReaderBase.class);
protected Configuration conf;
protected FileSystem fs;
protected Path path;
protected long edit = 0;
protected long fileLength;
- protected WALTrailer trailer;
- // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
- // than this size, it is written/read respectively, with a WARN message in the log.
- protected int trailerWarnSize;
/**
* Compression context to use reading. Can be null if no compression.
*/
@@ -65,8 +63,6 @@ public abstract class ReaderBase implements HLog.Reader {
this.path = path;
this.fs = fs;
this.fileLength = this.fs.getFileStatus(path).getLen();
- this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
- HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
String cellCodecClsName = initReader(stream);
boolean compression = hasCompression();
@@ -87,15 +83,17 @@ public abstract class ReaderBase implements HLog.Reader {
}
@Override
- public HLog.Entry next() throws IOException {
+ public Entry next() throws IOException {
return next(null);
}
@Override
- public HLog.Entry next(HLog.Entry reuse) throws IOException {
- HLog.Entry e = reuse;
+ public Entry next(Entry reuse) throws IOException {
+ Entry e = reuse;
if (e == null) {
- e = new HLog.Entry(new HLogKey(), new WALEdit());
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors,
+ // seqencefile based readers, and HLogInputFormat.
+ e = new Entry(new HLogKey(), new WALEdit());
}
if (compressionContext != null) {
e.setCompressionContext(compressionContext);
@@ -165,15 +163,10 @@ public abstract class ReaderBase implements HLog.Reader {
* @param e The entry to read into.
* @return Whether there was anything to read.
*/
- protected abstract boolean readNext(HLog.Entry e) throws IOException;
+ protected abstract boolean readNext(Entry e) throws IOException;
/**
* Performs a filesystem-level seek to a certain position in an underlying file.
*/
protected abstract void seekOnFs(long pos) throws IOException;
-
- @Override
- public WALTrailer getWALTrailer() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
index 985c0bb..03d1608 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
@@ -52,10 +52,10 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
// Get an instance of our cipher
- Cipher cipher = Encryption.getCipher(conf,
- conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER));
+ final String cipherName = conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER);
+ Cipher cipher = Encryption.getCipher(conf, cipherName);
if (cipher == null) {
- throw new RuntimeException("Cipher '" + cipher + "' is not available");
+ throw new RuntimeException("Cipher '" + cipherName + "' is not available");
}
// Generate an encryption key for this WAL
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index 128274a..11312b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
@@ -222,10 +222,27 @@ public class SequenceFileLogReader extends ReaderBase {
}
+ /**
+ * fill in the passed entry with teh next key/value.
+ * Note that because this format deals with our legacy storage, the provided
+ * Entery MUST use an {@link HLogKey} for the key.
+ * @return boolean indicating if the contents of Entry have been filled in.
+ */
@Override
protected boolean readNext(Entry e) throws IOException {
try {
- boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
+ if (!(e.getKey() instanceof HLogKey)) {
+ final IllegalArgumentException exception = new IllegalArgumentException(
+ "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" +
+ " one had '" + e.getKey().getClass() + "'");
+ LOG.error("We need to use the legacy SequenceFileLogReader to handle a " +
+ " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." +
+ " This is a bug; please file an issue or email the developer mailing list. You will " +
+ "need the following exception details when seeking help from the HBase community.",
+ exception);
+ throw exception;
+ }
+ boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit());
if (!hasNext) return false;
// Scopes are probably in WAL edit, move to key
NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index eddb92d..2194ce9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.wal.WALKey;
+
/**
- * Get notification of {@link FSHLog}/WAL log events. The invocations are inline
+ * Get notification of WAL events. The invocations are inline
* so make sure your implementation is fast else you'll slow hbase.
*/
@InterfaceAudience.Private
@@ -35,30 +37,30 @@ public interface WALActionsListener {
/**
* The WAL is going to be rolled. The oldPath can be null if this is
* the first log file from the regionserver.
- * @param oldPath the path to the old hlog
- * @param newPath the path to the new hlog
+ * @param oldPath the path to the old wal
+ * @param newPath the path to the new wal
*/
void preLogRoll(Path oldPath, Path newPath) throws IOException;
/**
* The WAL has been rolled. The oldPath can be null if this is
* the first log file from the regionserver.
- * @param oldPath the path to the old hlog
- * @param newPath the path to the new hlog
+ * @param oldPath the path to the old wal
+ * @param newPath the path to the new wal
*/
void postLogRoll(Path oldPath, Path newPath) throws IOException;
/**
* The WAL is going to be archived.
- * @param oldPath the path to the old hlog
- * @param newPath the path to the new hlog
+ * @param oldPath the path to the old wal
+ * @param newPath the path to the new wal
*/
void preLogArchive(Path oldPath, Path newPath) throws IOException;
/**
* The WAL has been archived.
- * @param oldPath the path to the old hlog
- * @param newPath the path to the new hlog
+ * @param oldPath the path to the old wal
+ * @param newPath the path to the new wal
*/
void postLogArchive(Path oldPath, Path newPath) throws IOException;
@@ -79,7 +81,7 @@ public interface WALActionsListener {
* @param logEdit
*/
void visitLogEntryBeforeWrite(
- HRegionInfo info, HLogKey logKey, WALEdit logEdit
+ HRegionInfo info, WALKey logKey, WALEdit logEdit
);
/**
@@ -87,11 +89,59 @@ public interface WALActionsListener {
* @param htd
* @param logKey
* @param logEdit
- * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, HLogKey, WALEdit)}
- * It only exists to get scope when replicating. Scope should be in the HLogKey and not need
+ * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
+ * It only exists to get scope when replicating. Scope should be in the WALKey and not need
* us passing in a <code>htd</code>.
*/
void visitLogEntryBeforeWrite(
- HTableDescriptor htd, HLogKey logKey, WALEdit logEdit
+ HTableDescriptor htd, WALKey logKey, WALEdit logEdit
);
+
+ /**
+ * For notification post append to the writer. Used by metrics system at least.
+ * TODO: Combine this with above.
+ * @param entryLen approx length of cells in this append.
+ * @param elapsedTimeMillis elapsed time in milliseconds.
+ */
+ void postAppend(final long entryLen, final long elapsedTimeMillis);
+
+ /**
+ * For notification post writer sync. Used by metrics system at least.
+ * @param timeInNanos How long the filesystem sync took in nanoseconds.
+ * @param handlerSyncs How many sync handler calls were released by this call to filesystem
+ * sync.
+ */
+ void postSync(final long timeInNanos, final int handlerSyncs);
+
+ static class Base implements WALActionsListener {
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {}
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
+
+ @Override
+ public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
+
+ @Override
+ public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
+
+ @Override
+ public void logRollRequested() {}
+
+ @Override
+ public void logCloseRequested() {}
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+
+ @Override
+ public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
+
+ @Override
+ public void postSync(final long timeInNanos, final int handlerSyncs) {}
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index f3927f9..433e5c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -130,7 +130,7 @@ public class WALCellCodec implements Codec {
byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
}
- // TODO: it sucks that compression context is in HLog.Entry. It'd be nice if it was here.
+ // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here.
// Dictionary could be gotten by enum; initially, based on enum, context would create
// an array of dictionaries.
static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index 521e5f3..52dcee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+
/**
* Implements the coprocessor environment and runtime support for coprocessors
- * loaded within a {@link FSHLog}.
+ * loaded within a {@link WAL}.
*/
@InterfaceAudience.Private
public class WALCoprocessorHost
@@ -42,10 +45,13 @@ public class WALCoprocessorHost
static class WALEnvironment extends CoprocessorHost.Environment
implements WALCoprocessorEnvironment {
- private FSHLog wal;
+ private final WAL wal;
+
+ final boolean useLegacyPre;
+ final boolean useLegacyPost;
@Override
- public FSHLog getWAL() {
+ public WAL getWAL() {
return wal;
}
@@ -56,23 +62,32 @@ public class WALCoprocessorHost
* @param priority chaining priority
* @param seq load sequence
* @param conf configuration
- * @param hlog HLog
+ * @param wal WAL
*/
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
final int priority, final int seq, final Configuration conf,
- final FSHLog hlog) {
+ final WAL wal) {
super(impl, priority, seq, conf);
- this.wal = hlog;
+ this.wal = wal;
+ // Pick which version of the API we'll call.
+ // This way we avoid calling the new version on older WALObservers so
+ // we can maintain binary compatibility.
+ // See notes in javadoc for WALObserver
+ useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
+ useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class,
+ HRegionInfo.class, WALKey.class, WALEdit.class);
}
}
- FSHLog wal;
+ private final WAL wal;
+
/**
* Constructor
* @param log the write ahead log
* @param conf the configuration
*/
- public WALCoprocessorHost(final FSHLog log, final Configuration conf) {
+ public WALCoprocessorHost(final WAL log, final Configuration conf) {
// We don't want to require an Abortable passed down through (FS)HLog, so
// this means that a failure to load of a WAL coprocessor won't abort the
// server. This isn't ideal, and means that security components that
@@ -100,21 +115,29 @@ public class WALCoprocessorHost
* @return true if default behavior should be bypassed, false otherwise
* @throws IOException
*/
- public boolean preWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
boolean bypass = false;
if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
for (WALEnvironment env: coprocessors) {
- if (env.getInstance() instanceof
- org.apache.hadoop.hbase.coprocessor.WALObserver) {
+ if (env.getInstance() instanceof WALObserver) {
+ final WALObserver observer = (WALObserver)env.getInstance();
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
- ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
- preWALWrite(ctx, info, logKey, logEdit);
+ if (env.useLegacyPre) {
+ if (logKey instanceof HLogKey) {
+ observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(observer.getClass(),
+ "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ observer.preWALWrite(ctx, info, logKey, logEdit);
+ }
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
@@ -135,20 +158,28 @@ public class WALCoprocessorHost
* @param logEdit
* @throws IOException
*/
- public void postWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+ public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
ObserverContext<WALCoprocessorEnvironment> ctx = null;
for (WALEnvironment env: coprocessors) {
- if (env.getInstance() instanceof
- org.apache.hadoop.hbase.coprocessor.WALObserver) {
+ if (env.getInstance() instanceof WALObserver) {
+ final WALObserver observer = (WALObserver)env.getInstance();
ctx = ObserverContext.createAndPrepare(env, ctx);
Thread currentThread = Thread.currentThread();
ClassLoader cl = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(env.getClassLoader());
- ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
- postWALWrite(ctx, info, logKey, logEdit);
+ if (env.useLegacyPost) {
+ if (logKey instanceof HLogKey) {
+ observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit);
+ } else {
+ legacyWarning(observer.getClass(),
+ "There are wal keys present that are not HLogKey.");
+ }
+ } else {
+ observer.postWALWrite(ctx, info, logKey, logEdit);
+ }
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 172e478..05cead2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.io.Writable;
* for serializing/deserializing a set of KeyValue items.
*
* Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R,
- * the HLog would have three log entries as follows:
+ * the WAL would have three log entries as follows:
*
* <logseq1-for-edit1>:<KeyValue-for-edit-c1>
* <logseq2-for-edit2>:<KeyValue-for-edit-c2>
@@ -73,7 +73,7 @@ import org.apache.hadoop.io.Writable;
* <-1, 3, <Keyvalue-for-edit-c1>, <KeyValue-for-edit-c2>, <KeyValue-for-edit-c3>>
*
* The -1 marker is just a special way of being backward compatible with
- * an old HLog which would have contained a single <KeyValue>.
+ * an old WAL which would have contained a single <KeyValue>.
*
* The deserializer for WALEdit backward compatibly detects if the record
* is an old style KeyValue or the new style WALEdit.
@@ -168,7 +168,7 @@ public class WALEdit implements Writable, HeapSize {
int versionOrLength = in.readInt();
// TODO: Change version when we protobuf. Also, change way we serialize KV! Pb it too.
if (versionOrLength == VERSION_2) {
- // this is new style HLog entry containing multiple KeyValues.
+ // this is new style WAL entry containing multiple KeyValues.
int numEdits = in.readInt();
for (int idx = 0; idx < numEdits; idx++) {
if (compressionContext != null) {
@@ -189,7 +189,7 @@ public class WALEdit implements Writable, HeapSize {
}
}
} else {
- // this is an old style HLog entry. The int that we just
+ // this is an old style WAL entry. The int that we just
// read is actually the length of a single KeyValue
this.add(KeyValue.create(versionOrLength, in));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 9c0b8a9..ff5f2f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.protobuf.ServiceException;
@@ -96,17 +97,17 @@ public class WALEditsReplaySink {
* @param entries
* @throws IOException
*/
- public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException {
+ public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException {
if (entries.size() == 0) {
return;
}
int batchSize = entries.size();
- Map<HRegionInfo, List<HLog.Entry>> entriesByRegion =
- new HashMap<HRegionInfo, List<HLog.Entry>>();
+ Map<HRegionInfo, List<Entry>> entriesByRegion =
+ new HashMap<HRegionInfo, List<Entry>>();
HRegionLocation loc = null;
- HLog.Entry entry = null;
- List<HLog.Entry> regionEntries = null;
+ Entry entry = null;
+ List<Entry> regionEntries = null;
// Build the action list.
for (int i = 0; i < batchSize; i++) {
loc = entries.get(i).getFirst();
@@ -114,7 +115,7 @@ public class WALEditsReplaySink {
if (entriesByRegion.containsKey(loc.getRegionInfo())) {
regionEntries = entriesByRegion.get(loc.getRegionInfo());
} else {
- regionEntries = new ArrayList<HLog.Entry>();
+ regionEntries = new ArrayList<Entry>();
entriesByRegion.put(loc.getRegionInfo(), regionEntries);
}
regionEntries.add(entry);
@@ -123,9 +124,9 @@ public class WALEditsReplaySink {
long startTime = EnvironmentEdgeManager.currentTime();
// replaying edits by region
- for (Map.Entry<HRegionInfo, List<HLog.Entry>> _entry : entriesByRegion.entrySet()) {
+ for (Map.Entry<HRegionInfo, List<Entry>> _entry : entriesByRegion.entrySet()) {
HRegionInfo curRegion = _entry.getKey();
- List<HLog.Entry> allActions = _entry.getValue();
+ List<Entry> allActions = _entry.getValue();
// send edits in chunks
int totalActions = allActions.size();
int replayedActions = 0;
@@ -159,7 +160,7 @@ public class WALEditsReplaySink {
}
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
- final List<HLog.Entry> entries) throws IOException {
+ final List<Entry> entries) throws IOException {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
ReplayServerCallable<ReplicateWALEntryResponse> callable =
@@ -182,11 +183,11 @@ public class WALEditsReplaySink {
*/
class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
private HRegionInfo regionInfo;
- private List<HLog.Entry> entries;
+ private List<Entry> entries;
ReplayServerCallable(final HConnection connection, final TableName tableName,
final HRegionLocation regionLoc, final HRegionInfo regionInfo,
- final List<HLog.Entry> entries) {
+ final List<Entry> entries) {
super(connection, tableName, null);
this.entries = entries;
this.regionInfo = regionInfo;
@@ -203,11 +204,11 @@ public class WALEditsReplaySink {
return null;
}
- private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries)
+ private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
throws IOException, ServiceException {
if (entries.isEmpty()) return;
- HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
+ Entry[] entriesArray = new Entry[entries.size()];
entriesArray = entries.toArray(entriesArray);
AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
@@ -228,11 +229,11 @@ public class WALEditsReplaySink {
// if not due to connection issue, the following code should run fast because it uses
// cached location
boolean skip = false;
- for (HLog.Entry entry : this.entries) {
+ for (Entry entry : this.entries) {
WALEdit edit = entry.getEdit();
List<Cell> cells = edit.getCells();
for (Cell cell : cells) {
- // filtering HLog meta entries
+ // filtering WAL meta entries
setLocation(conn.locateRegion(tableName, cell.getRow()));
skip = true;
break;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
new file mode 100644
index 0000000..5f00643
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Helper methods to ease Region Server integration with the write ahead log.
+ * Note that methods in this class specifically should not require access to anything
+ * other than the API found in {@link WAL}.
+ */
+@InterfaceAudience.Private
+public class WALUtil {
+ static final Log LOG = LogFactory.getLog(WALUtil.class);
+
+ /**
+ * Write the marker that a compaction has succeeded and is about to be committed.
+ * This provides info to the HMaster to allow it to recover the compaction if
+ * this regionserver dies in the middle (This part is not yet implemented). It also prevents
+ * the compaction from finishing if this regionserver has already lost its lease on the log.
+ * @param sequenceId Used by WAL to get sequence Id for the waledit.
+ */
+ public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+ final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
+ TableName tn = TableName.valueOf(c.getTableName().toByteArray());
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+ log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
+ log.sync();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
+ }
+ }
+
+ /**
+ * Write a flush marker indicating a start / abort or a complete of a region flush
+ */
+ public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+ final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
+ TableName tn = TableName.valueOf(f.getTableName().toByteArray());
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+ long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
+ null);
+ if (sync) log.sync(trx);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
+ }
+ return trx;
+ }
+
+ /**
+ * Write a region open marker indicating that the region is opened
+ */
+ public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+ final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
+ TableName tn = TableName.valueOf(r.getTableName().toByteArray());
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+ long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
+ sequenceId, false, null);
+ log.sync(trx);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
+ }
+ return trx;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
index cd3aeaf..8188e02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
/**
* Context used by our wal dictionary compressor. Null if we're not to do our
* custom dictionary compression.
*/
@InterfaceAudience.Private
-public abstract class WriterBase implements HLog.Writer {
+public abstract class WriterBase implements DefaultWALProvider.Writer {
protected CompressionContext compressionContext;
protected Configuration conf;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 281ba63..6a3981a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* A {@link WALEntryFilter} which contains multiple filters and applies them
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 03b66d2..c3ec976 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import com.google.common.util.concurrent.Service;
@@ -128,13 +128,13 @@ public interface ReplicationEndpoint extends Service {
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class ReplicateContext {
- List<HLog.Entry> entries;
+ List<Entry> entries;
int size;
@InterfaceAudience.Private
public ReplicateContext() {
}
- public ReplicateContext setEntries(List<HLog.Entry> entries) {
+ public ReplicateContext setEntries(List<Entry> entries) {
this.entries = entries;
return this;
}
@@ -142,7 +142,7 @@ public interface ReplicationEndpoint extends Service {
this.size = size;
return this;
}
- public List<HLog.Entry> getEntries() {
+ public List<Entry> getEntries() {
return entries;
}
public int getSize() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 5df7b25..166dc37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -24,7 +24,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Keeps KVs that are scoped other than local
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
index b683ad6..46b8b81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Skips WAL edits for all System tables including META
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 0ea267d..b892512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.util.Bytes;
public class TableCfWALEntryFilter implements WALEntryFilter {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 60797c9..b66ddde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* A Filter for WAL entries before being sent over to replication. Multiple
@@ -30,12 +30,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
public interface WALEntryFilter {
/**
- * Applies the filter, possibly returning a different HLog.Entry instance.
+ * Applies the filter, possibly returning a different Entry instance.
* If null is returned, the entry will be skipped.
- * @param entry WAL Entry to filter
- * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
+ * @param entry Entry to filter
+ * @return a (possibly modified) Entry to use. Returning null or an entry with
* no cells will cause the entry to be skipped for replication.
*/
- public HLog.Entry filter(HLog.Entry entry);
+ public Entry filter(Entry entry);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 8f099d7..525b7ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -61,17 +61,17 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
return files;
}
- final Set<String> hlogs = loadHLogsFromQueues();
+ final Set<String> wals = loadWALsFromQueues();
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
- String hlog = file.getPath().getName();
- boolean logInReplicationQueue = hlogs.contains(hlog);
+ String wal = file.getPath().getName();
+ boolean logInReplicationQueue = wals.contains(wal);
if (LOG.isDebugEnabled()) {
if (logInReplicationQueue) {
- LOG.debug("Found log in ZK, keeping: " + hlog);
+ LOG.debug("Found log in ZK, keeping: " + wal);
} else {
- LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
+ LOG.debug("Didn't find this log in ZK, deleting: " + wal);
}
}
return !logInReplicationQueue;
@@ -79,15 +79,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
}
/**
- * Load all hlogs in all replication queues from ZK
+ * Load all wals in all replication queues from ZK
*/
- private Set<String> loadHLogsFromQueues() {
+ private Set<String> loadWALsFromQueues() {
List<String> rss = replicationQueues.getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
- Set<String> hlogs = Sets.newHashSet();
+ Set<String> wals = Sets.newHashSet();
for (String rs: rss) {
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
// if rs just died, this will be null
@@ -95,13 +95,13 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
continue;
}
for (String id : listOfPeers) {
- List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
- if (peersHlogs != null) {
- hlogs.addAll(peersHlogs);
+ List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+ if (peersWals != null) {
+ wals.addAll(peersWals);
}
}
}
- return hlogs;
+ return wals;
}
@Override
@@ -109,7 +109,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
// If replication is disabled, keep all members null
if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT)) {
- LOG.warn("Not configured - allowing all hlogs to be deleted");
+ LOG.warn("Not configured - allowing all wals to be deleted");
return;
}
// Make my own Configuration. Then I'll have my own connection to zk that
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 0906847..397044d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
@@ -136,7 +136,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
- List<HLog.Entry> entries = replicateContext.getEntries();
+ List<Entry> entries = replicateContext.getEntries();
int sleepMultiplier = 1;
while (this.isRunning()) {
if (!peersSelected) {
@@ -159,7 +159,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
" entries of total size " + replicateContext.getSize());
}
ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new HLog.Entry[entries.size()]));
+ entries.toArray(new Entry[entries.size()]));
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2c413f4..4729644 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -66,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@InterfaceAudience.Private
-public class Replication implements WALActionsListener,
+public class Replication extends WALActionsListener.Base implements
ReplicationSourceService, ReplicationSinkService {
private static final Log LOG =
LogFactory.getLog(Replication.class);
@@ -155,7 +155,7 @@ public class Replication implements WALActionsListener,
}
/*
- * Returns an object to listen to new hlog changes
+ * Returns an object to listen to new wal changes
**/
public WALActionsListener getWALActionsListener() {
return this;
@@ -222,13 +222,7 @@ public class Replication implements WALActionsListener,
}
@Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {
- // Not interested
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) {
scopeWALEdits(htd, logKey, logEdit);
}
@@ -240,7 +234,7 @@ public class Replication implements WALActionsListener,
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
*/
- public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
+ public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) {
NavigableMap<byte[], Integer> scopes =
new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
@@ -273,16 +267,6 @@ public class Replication implements WALActionsListener,
getReplicationManager().postLogRoll(newPath);
}
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested
- }
-
/**
* This method modifies the master's configuration in order to inject
* replication-related features
@@ -299,16 +283,6 @@ public class Replication implements WALActionsListener,
}
}
- @Override
- public void logRollRequested() {
- // Not interested
- }
-
- @Override
- public void logCloseRequested() {
- // not interested
- }
-
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
deleted file mode 100644
index ccae169..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-
-import java.io.IOException;
-
-/**
- * Wrapper class around HLog to help manage the implementation details
- * such as compression.
- */
-@InterfaceAudience.Private
-public class ReplicationHLogReaderManager {
-
- private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
- private final FileSystem fs;
- private final Configuration conf;
- private long position = 0;
- private HLog.Reader reader;
- private Path lastPath;
-
- /**
- * Creates the helper but doesn't open any file
- * Use setInitialPosition after using the constructor if some content needs to be skipped
- * @param fs
- * @param conf
- */
- public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
- this.fs = fs;
- this.conf = conf;
- }
-
- /**
- * Opens the file at the current position
- * @param path
- * @return an HLog reader.
- * @throws IOException
- */
- public HLog.Reader openReader(Path path) throws IOException {
- // Detect if this is a new file, if so get a new reader else
- // reset the current reader so that we see the new data
- if (this.reader == null || !this.lastPath.equals(path)) {
- this.closeReader();
- this.reader = HLogFactory.createReader(this.fs, path, this.conf);
- this.lastPath = path;
- } else {
- try {
- this.reader.reset();
- } catch (NullPointerException npe) {
- throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
- }
- }
- return this.reader;
- }
-
- /**
- * Get the next entry, returned and also added in the array
- * @return a new entry or null
- * @throws IOException
- */
- public HLog.Entry readNextAndSetPosition() throws IOException {
- HLog.Entry entry = this.reader.next();
- // Store the position so that in the future the reader can start
- // reading from here. If the above call to next() throws an
- // exception, the position won't be changed and retry will happen
- // from the last known good position
- this.position = this.reader.getPosition();
- // We need to set the CC to null else it will be compressed when sent to the sink
- if (entry != null) {
- entry.setCompressionContext(null);
- }
- return entry;
- }
-
- /**
- * Advance the reader to the current position
- * @throws IOException
- */
- public void seek() throws IOException {
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- }
-
- /**
- * Get the position that we stopped reading at
- * @return current position, cannot be negative
- */
- public long getPosition() {
- return this.position;
- }
-
- public void setPosition(long pos) {
- this.position = pos;
- }
-
- /**
- * Close the current reader
- * @throws IOException
- */
- public void closeReader() throws IOException {
- if (this.reader != null) {
- this.reader.close();
- this.reader = null;
- }
- }
-
- /**
- * Tell the helper to reset internal state
- */
- void finishCurrentFile() {
- this.position = 0;
- try {
- this.closeReader();
- } catch (IOException e) {
- LOG.warn("Unable to close reader", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 7ed7bec..9a60131 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
* <p/>
* This replication process is currently waiting for the edits to be applied
* before the method can return. This means that the replication of edits
- * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * is synchronized (after reading from WALs in ReplicationSource) and that a
* single region server cannot receive edits from two sources at the same time
* <p/>
* This class uses the native HBase client in order to replicate entries.