You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/05/23 23:58:30 UTC
svn commit: r1485865 - in /hbase/branches/0.95:
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/
hbase-server/src/test/java/o...
Author: tedyu
Date: Thu May 23 21:58:29 2013
New Revision: 1485865
URL: http://svn.apache.org/r1485865
Log:
HBASE-8497 Protobuf WAL also needs a trailer (Himanshu)
Modified:
hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Modified: hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java Thu May 23 21:58:29 2013
@@ -2922,6 +2922,306 @@ public final class WALProtos {
// @@protoc_insertion_point(class_scope:CompactionDescriptor)
}
+ public interface WALTrailerOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+ }
+ public static final class WALTrailer extends
+ com.google.protobuf.GeneratedMessage
+ implements WALTrailerOrBuilder {
+ // Use WALTrailer.newBuilder() to construct.
+ private WALTrailer(Builder builder) {
+ super(builder);
+ }
+ private WALTrailer(boolean noInit) {}
+
+ private static final WALTrailer defaultInstance;
+ public static WALTrailer getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public WALTrailer getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable;
+ }
+
+ private void initFields() {
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other = (org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) obj;
+
+ boolean result = true;
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailerOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer build() {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = new org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer(this);
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance()) return this;
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+
+ // @@protoc_insertion_point(builder_scope:WALTrailer)
+ }
+
+ static {
+ defaultInstance = new WALTrailer(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:WALTrailer)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_WALHeader_descriptor;
private static
@@ -2942,6 +3242,11 @@ public final class WALProtos {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_CompactionDescriptor_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_WALTrailer_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_WALTrailer_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -2962,11 +3267,11 @@ public final class WALProtos {
"ionDescriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021enco" +
"dedRegionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022",
"\027\n\017compactionInput\030\004 \003(\t\022\030\n\020compactionOu" +
- "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t*F\n\tScop" +
- "eType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030RE" +
- "PLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.h" +
- "adoop.hbase.protobuf.generatedB\tWALProto" +
- "sH\001\210\001\000\240\001\001"
+ "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALT" +
+ "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" +
+ "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" +
+ "\n*org.apache.hadoop.hbase.protobuf.gener" +
+ "atedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3005,6 +3310,14 @@ public final class WALProtos {
new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", },
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.class,
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.Builder.class);
+ internal_static_WALTrailer_descriptor =
+ getDescriptor().getMessageTypes().get(4);
+ internal_static_WALTrailer_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_WALTrailer_descriptor,
+ new java.lang.String[] { },
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.class,
+ org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.Builder.class);
return null;
}
};
Modified: hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto (original)
+++ hbase/branches/0.95/hbase-protocol/src/main/protobuf/WAL.proto Thu May 23 21:58:29 2013
@@ -74,3 +74,10 @@ message CompactionDescriptor {
repeated string compactionOutput = 5;
required string storeHomeDir = 6;
}
+
+/**
+ * A trailer that is appended to the end of a properly closed HLog WAL file.
+ * If missing, this is either a legacy or a corrupted WAL file.
+ */
+message WALTrailer {
+}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu May 23 21:58:29 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.Writable;
@@ -48,6 +49,14 @@ public interface HLog {
/** The META region's HLog filename extension */
public static final String META_HLOG_FILE_EXTN = ".meta";
+ /**
+ * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the
+ * configured size, a warning is logged. This is used with Protobuf reader/writer.
+ */
+ public static final String WAL_TRAILER_WARN_SIZE =
+ "hbase.regionserver.waltrailer.warn.size";
+ public static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB
+
static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
@@ -71,6 +80,12 @@ public interface HLog {
long getPosition() throws IOException;
void reset() throws IOException;
+
+ /**
+ * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
+ * files.
+ */
+ WALTrailer getWALTrailer();
}
public interface Writer {
@@ -83,6 +98,12 @@ public interface HLog {
void append(Entry entry) throws IOException;
long getLength() throws IOException;
+
+ /**
+ * Sets HLog's WALTrailer. This trailer is appended at the end of WAL on closing.
+ * @param walTrailer trailer to append to WAL.
+ */
+ void setWALTrailer(WALTrailer walTrailer);
}
/**
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java Thu May 23 21:58:29 2013
@@ -21,32 +21,45 @@ package org.apache.hadoop.hbase.regionse
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.InvalidProtocolBufferException;
/**
- * Reader for protobuf-based WAL.
+ * A Protobuf based WAL has the following structure:
+ * <p>
+ * <PB_WAL_MAGIC><WALHeader><WALEdits>...<WALEdits><Trailer>
+ * <TrailerSize> <PB_WAL_COMPLETE_MAGIC>
+ * </p>
+ * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
+ * {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure
+ * which is appended at the end of the WAL. This is empty for now; it can contain some meta
+ * information such as Region level stats, etc in future.
*/
@InterfaceAudience.Private
public class ProtobufLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
-
+ static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
private FSDataInputStream inputStream;
private Codec.Decoder cellDecoder;
private WALCellCodec.ByteStringUncompressor byteStringUncompressor;
private boolean hasCompression = false;
+ // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
+ // in the hlog, the inputstream's position is equal to walEditsStopOffset.
+ private long walEditsStopOffset;
+ private boolean trailerPresent;
public ProtobufLogReader() {
super();
@@ -97,7 +110,67 @@ public class ProtobufLogReader extends R
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
}
this.inputStream = stream;
+ this.walEditsStopOffset = this.fileLength;
+ long currentPosition = stream.getPos();
+ trailerPresent = setTrailerIfPresent();
+ this.seekOnFs(currentPosition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
+ + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
+ }
+ }
+ /**
+ * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
+ * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
+ * the trailer, and checks whether the trailer is present at the end or not by comparing the last
+ * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
+ * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
+ * before the trailer.
+ * <ul>
+ * The trailer is ignored in case:
+ * <li>fileLength is 0 or not correct (when file is under recovery, etc).
+ * <li>the trailer size is negative.
+ * </ul>
+ * <p>
+ * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
+ * @return true if a valid trailer is present
+ * @throws IOException
+ */
+ private boolean setTrailerIfPresent() {
+ try {
+ long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
+ if (trailerSizeOffset <= 0) return false;// no trailer possible.
+ this.seekOnFs(trailerSizeOffset);
+ // read the int as trailer size.
+ int trailerSize = this.inputStream.readInt();
+ ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
+ this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+ if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
+ LOG.warn("No trailer found.");
+ return false;
+ }
+ if (trailerSize < 0) {
+ LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
+ return false;
+ } else if (trailerSize > this.trailerWarnSize) {
+ // continue reading after warning the user.
+ LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
+ + trailerSize + " > " + this.trailerWarnSize);
+ }
+ // seek to the position where trailer starts.
+ long positionOfTrailer = trailerSizeOffset - trailerSize;
+ this.seekOnFs(positionOfTrailer);
+ // read the trailer.
+ buf = ByteBuffer.allocate(trailerSize);// for trailer.
+ this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+ trailer = WALTrailer.parseFrom(buf.array());
+ this.walEditsStopOffset = positionOfTrailer;
+ return true;
+ } catch (IOException ioe) {
+ LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
+ }
+ return false;
}
@Override
@@ -117,6 +190,7 @@ public class ProtobufLogReader extends R
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {
+ if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false;
WALKey.Builder builder = WALKey.newBuilder();
boolean hasNext = false;
try {
@@ -162,6 +236,12 @@ public class ProtobufLogReader extends R
LOG.error(message);
throw new IOException(message, ex);
}
+ if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
+ LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ + this.walEditsStopOffset);
+ throw new IOException("Read WALTrailer while reading WALEdits");
+ }
return true;
}
}
@@ -186,6 +266,11 @@ public class ProtobufLogReader extends R
}
@Override
+ public WALTrailer getWALTrailer() {
+ return trailer;
+ }
+
+ @Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java Thu May 23 21:58:29 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
/**
* Writer for protobuf-based WAL.
@@ -43,7 +44,11 @@ public class ProtobufLogWriter implement
private FSDataOutputStream output;
private Codec.Encoder cellEncoder;
private WALCellCodec.ByteStringCompressor compressor;
-
+ private boolean trailerWritten;
+ private WALTrailer trailer;
+ // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+ // than this size, it is written/read respectively, with a WARN message in the log.
+ private int trailerWarnSize;
/** Context used by our wal dictionary compressor.
* Null if we're not to do our custom dictionary compression. */
@@ -64,6 +69,8 @@ public class ProtobufLogWriter implement
throw new IOException("Failed to initiate CompressionContext", e);
}
}
+ this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
+ HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = FSUtils.getDefaultBufferSize(fs);
short replication = (short)conf.getInt(
"hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
@@ -78,6 +85,8 @@ public class ProtobufLogWriter implement
if (doCompress) {
this.compressor = codec.getByteStringCompressor();
}
+ // instantiate trailer to default value.
+ trailer = WALTrailer.newBuilder().build();
LOG.debug("Writing protobuf WAL; path=" + path + ", compression=" + doCompress);
}
@@ -96,6 +105,7 @@ public class ProtobufLogWriter implement
public void close() throws IOException {
if (this.output != null) {
try {
+ if (!trailerWritten) writeWALTrailer();
this.output.close();
} catch (NullPointerException npe) {
// Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@@ -105,6 +115,28 @@ public class ProtobufLogWriter implement
}
}
+ private void writeWALTrailer() {
+ try {
+ int trailerSize = 0;
+ if (this.trailer == null) {
+ // use default trailer.
+ LOG.warn("WALTrailer is null. Continuing with default.");
+ this.trailer = WALTrailer.newBuilder().build();
+ trailerSize = this.trailer.getSerializedSize();
+ } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
+ // continue writing after warning the user.
+ LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
+ trailerSize + " > " + this.trailerWarnSize);
+ }
+ this.trailer.writeTo(output);
+ this.output.writeInt(trailerSize);
+ this.output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
+ this.trailerWritten = true;
+ } catch (IOException ioe) {
+ LOG.error("Got IOException while writing trailer", ioe);
+ }
+ }
+
@Override
public void sync() throws IOException {
try {
@@ -129,4 +161,9 @@ public class ProtobufLogWriter implement
public FSDataOutputStream getStream() {
return this.output;
}
+
+ @Override
+ public void setWALTrailer(WALTrailer walTrailer) {
+ this.trailer = walTrailer;
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java Thu May 23 21:58:29 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
@InterfaceAudience.Private
public abstract class ReaderBase implements HLog.Reader {
@@ -33,6 +34,11 @@ public abstract class ReaderBase impleme
protected FileSystem fs;
protected Path path;
protected long edit = 0;
+ protected long fileLength;
+ protected WALTrailer trailer;
+ // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+ // than this size, it is written/read respectively, with a WARN message in the log.
+ protected int trailerWarnSize;
/**
* Compression context to use reading. Can be null if no compression.
*/
@@ -51,7 +57,9 @@ public abstract class ReaderBase impleme
this.conf = conf;
this.path = path;
this.fs = fs;
-
+ this.fileLength = this.fs.getFileStatus(path).getLen();
+ this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
+ HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
initReader(stream);
boolean compression = hasCompression();
@@ -134,4 +142,8 @@ public abstract class ReaderBase impleme
*/
protected abstract void seekOnFs(long pos) throws IOException;
+ @Override
+ public WALTrailer getWALTrailer() {
+ return null;
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Thu May 23 21:58:29 2013
@@ -25,7 +25,6 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.NavigableMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -35,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.Text;
@InterfaceAudience.Private
public class SequenceFileLogReader extends ReaderBase {
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Thu May 23 21:58:29 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -232,4 +233,11 @@ public class SequenceFileLogWriter imple
public FSDataOutputStream getWriterFSDataOutputStream() {
return this.writer_out;
}
+
+ /**
+ * This method is empty as trailer is added only in Protobuf based hlog readers/writers.
+ */
+ @Override
+ public void setWALTrailer(WALTrailer walTrailer) {
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu May 23 21:58:29 2013
@@ -822,6 +822,93 @@ public class TestHLog {
}
}
+ /**
+ * Reads the WAL with and without WALTrailer.
+ * @throws IOException
+ */
+ @Test
+ public void testWALTrailer() throws IOException {
+ // read With trailer.
+ doRead(true);
+ // read without trailer
+ doRead(false);
+ }
+
+ /**
+ * Appends entries in the WAL and reads it.
+ * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
+ * so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
+ * call. This means that reader is not aware of the trailer. In this scenario, if the
+ * reader tries to read the trailer in its next() call, it returns false from
+ * ProtoBufLogReader.
+ * @throws IOException
+ */
+ private void doRead(boolean withTrailer) throws IOException {
+ final int columnCount = 5;
+ final int recordCount = 5;
+ final byte[] tableName = Bytes.toBytes("tablename");
+ final byte[] row = Bytes.toBytes("row");
+ long timestamp = System.currentTimeMillis();
+ Path path = new Path(dir, "temphlog");
+ HLog.Writer writer = null;
+ HLog.Reader reader = null;
+ try {
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ fs.mkdirs(dir);
+ // Write log in pb format.
+ writer = HLogFactory.createWriter(fs, path, conf);
+ for (int i = 0; i < recordCount; ++i) {
+ HLogKey key = new HLogKey(
+ hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
+ WALEdit edit = new WALEdit();
+ for (int j = 0; j < columnCount; ++j) {
+ if (i == 0) {
+ htd.addFamily(new HColumnDescriptor("column" + j));
+ }
+ String value = i + "" + j;
+ edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
+ }
+ writer.append(new HLog.Entry(key, edit));
+ }
+ writer.sync();
+ if (withTrailer) writer.close();
+
+ // Now read the log using standard means.
+ reader = HLogFactory.createReader(fs, path, conf);
+ assertTrue(reader instanceof ProtobufLogReader);
+ if (withTrailer) {
+ assertNotNull(reader.getWALTrailer());
+ } else {
+ assertNull(reader.getWALTrailer());
+ }
+ for (int i = 0; i < recordCount; ++i) {
+ HLog.Entry entry = reader.next();
+ assertNotNull(entry);
+ assertEquals(columnCount, entry.getEdit().size());
+ assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+ assertArrayEquals(tableName, entry.getKey().getTablename());
+ int idx = 0;
+ for (KeyValue val : entry.getEdit().getKeyValues()) {
+ assertTrue(Bytes.equals(row, val.getRow()));
+ String value = i + "" + idx;
+ assertArrayEquals(Bytes.toBytes(value), val.getValue());
+ idx++;
+ }
+ }
+ HLog.Entry entry = reader.next();
+ assertNull(entry);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
static class DumbWALActionsListener implements WALActionsListener {
int increments = 0;
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1485865&r1=1485864&r2=1485865&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Thu May 23 21:58:29 2013
@@ -133,6 +133,7 @@ public class TestHLogSplit {
INSERT_GARBAGE_IN_THE_MIDDLE,
APPEND_GARBAGE,
TRUNCATE,
+ TRUNCATE_TRAILER
}
@BeforeClass
@@ -662,6 +663,38 @@ public class TestHLogSplit {
}
@Test
+ public void testCorruptWALTrailer() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+ final String REGION = "region__1";
+ REGIONS.removeAll(REGIONS);
+ REGIONS.add(REGION);
+
+ int entryCount = 10;
+ Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
+ generateHLogs(1, entryCount, -1);
+ corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
+
+ fs.initialize(fs.getUri(), conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
+ HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
+ logSplitter.splitLog();
+
+ Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+
+ int actualCount = 0;
+ HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
+ @SuppressWarnings("unused")
+ HLog.Entry entry;
+ while ((entry = in.next()) != null) ++actualCount;
+ assertEquals(entryCount, actualCount);
+
+ // should not have stored the EOF files as corrupt
+ FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
+ assertEquals(archivedLogs.length, 0);
+ }
+
+ @Test
public void testLogsGetArchivedAfterSplit() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
@@ -1462,9 +1495,16 @@ public class TestHLogSplit {
case TRUNCATE:
fs.delete(path, false);
out = fs.create(path);
- out.write(corrupted_bytes, 0, fileSize-32);
+ out.write(corrupted_bytes, 0, fileSize
+ - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
closeOrFlush(close, out);
+ break;
+ case TRUNCATE_TRAILER:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
+ closeOrFlush(close, out);
break;
}
}