You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/08/30 20:07:47 UTC
svn commit: r1519038 -
/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
Author: jdcryans
Date: Fri Aug 30 18:07:47 2013
New Revision: 1519038
URL: http://svn.apache.org/r1519038
Log:
HBASE-9373 [replication] data loss because replication doesn't expect partial reads
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1519038&r1=1519037&r2=1519038&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java Fri Aug 30 18:07:47 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.common.io.LimitInputStream;
+import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@@ -147,7 +150,7 @@ public class ProtobufLogReader extends R
ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
- LOG.warn("No trailer found.");
+ LOG.trace("No trailer found.");
return false;
}
if (trailerSize < 0) {
@@ -190,57 +193,73 @@ public class ProtobufLogReader extends R
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {
- if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false;
+ long originalPosition = this.inputStream.getPos();
+ if (trailerPresent && originalPosition == this.walEditsStopOffset) return false;
WALKey.Builder builder = WALKey.newBuilder();
- boolean hasNext = false;
+ int size = 0;
try {
- hasNext = builder.mergeDelimitedFrom(inputStream);
- } catch (InvalidProtocolBufferException ipbe) {
- LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
- }
- if (!hasNext) return false;
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get next record.
- LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
- return false;
- }
- WALKey walKey = builder.build();
- entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
- if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
- LOG.warn("WALKey has no KVs that follow it; trying the next one");
- continue;
- }
- int expectedCells = walKey.getFollowingKvCount();
- long posBefore = this.inputStream.getPos();
- try {
- int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
- if (expectedCells != actualCells) {
- throw new EOFException("Only read " + actualCells); // other info added in catch
+ int originalAvailable = this.inputStream.available();
+ try {
+ int firstByte = this.inputStream.read();
+ if (firstByte == -1) {
+ throw new EOFException("First byte is negative");
+ }
+ size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
+ if (this.inputStream.available() < size) {
+ throw new EOFException("Available stream not enough for edit, " +
+ "inputStream.available()= " + this.inputStream.available() + ", " +
+ "entry size= " + size);
+ }
+ final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
+ builder.mergeFrom(limitedInput);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
+ originalPosition + ", currentPosition=" + this.inputStream.getPos() +
+ ", messageSize=" + size + ", originalAvailable=" + originalAvailable +
+ ", currentAvailable=" + this.inputStream.available()).initCause(ipbe);
+ }
+ if (!builder.isInitialized()) {
+ // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
+ // If we can get the KV count, we could, theoretically, try to get next record.
+ throw new EOFException("Partial PB while reading WAL, " +
+ "probably an unexpected EOF, ignoring");
+ }
+ WALKey walKey = builder.build();
+ entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+ if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
+ LOG.trace("WALKey has no KVs that follow it; trying the next one");
+ continue;
}
- } catch (Exception ex) {
- String posAfterStr = "<unknown>";
+ int expectedCells = walKey.getFollowingKvCount();
+ long posBefore = this.inputStream.getPos();
try {
- posAfterStr = this.inputStream.getPos() + "";
- } catch (Throwable t) {
- LOG.trace("Error getting pos for error message - ignoring", t);
+ int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+ if (expectedCells != actualCells) {
+ throw new EOFException("Only read " + actualCells); // other info added in catch
+ }
+ } catch (Exception ex) {
+ String posAfterStr = "<unknown>";
+ try {
+ posAfterStr = this.inputStream.getPos() + "";
+ } catch (Throwable t) {
+ LOG.trace("Error getting pos for error message - ignoring", t);
+ }
+ String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ + posBefore + " and read up to " + posAfterStr;
+ IOException realEofEx = extractHiddenEof(ex);
+ throw (EOFException) new EOFException("EOF " + message).
+ initCause(realEofEx != null ? ex : realEofEx);
}
- String message = " while reading " + expectedCells + " WAL KVs; started reading at "
- + posBefore + " and read up to " + posAfterStr;
- IOException realEofEx = extractHiddenEof(ex);
- if (realEofEx != null) {
- LOG.error("EOF " + message, realEofEx);
- return false;
+ if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
+ LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ + this.walEditsStopOffset);
+ throw new EOFException("Read WALTrailer while reading WALEdits");
}
- message = "Error " + message;
- LOG.error(message);
- throw new IOException(message, ex);
- }
- if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
- LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
- + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
- + this.walEditsStopOffset);
- throw new IOException("Read WALTrailer while reading WALEdits");
+ } catch (EOFException eof) {
+ LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
+ seekOnFs(originalPosition);
+ return false;
}
return true;
}