You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/08/30 20:07:47 UTC

svn commit: r1519038 - /hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java

Author: jdcryans
Date: Fri Aug 30 18:07:47 2013
New Revision: 1519038

URL: http://svn.apache.org/r1519038
Log:
HBASE-9373 [replication] data loss because replication doesn't expect partial reads

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1519038&r1=1519037&r2=1519038&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java Fri Aug 30 18:07:47 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
@@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.io.LimitInputStream;
+import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -147,7 +150,7 @@ public class ProtobufLogReader extends R
       ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
       if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
-        LOG.warn("No trailer found.");
+        LOG.trace("No trailer found.");
         return false;
       }
       if (trailerSize < 0) {
@@ -190,57 +193,73 @@ public class ProtobufLogReader extends R
   @Override
   protected boolean readNext(HLog.Entry entry) throws IOException {
     while (true) {
-      if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false;
+      long originalPosition = this.inputStream.getPos();
+      if (trailerPresent && originalPosition == this.walEditsStopOffset) return false;
       WALKey.Builder builder = WALKey.newBuilder();
-      boolean hasNext = false;
+      int size = 0;
       try {
-        hasNext = builder.mergeDelimitedFrom(inputStream);
-      } catch (InvalidProtocolBufferException ipbe) {
-        LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
-      }
-      if (!hasNext) return false;
-      if (!builder.isInitialized()) {
-        // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
-        //       If we can get the KV count, we could, theoretically, try to get next record.
-        LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
-        return false;
-      }
-      WALKey walKey = builder.build();
-      entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
-      if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
-        LOG.warn("WALKey has no KVs that follow it; trying the next one");
-        continue;
-      }
-      int expectedCells = walKey.getFollowingKvCount();
-      long posBefore = this.inputStream.getPos();
-      try {
-        int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
-        if (expectedCells != actualCells) {
-          throw new EOFException("Only read " + actualCells); // other info added in catch
+        int originalAvailable = this.inputStream.available();
+        try {
+          int firstByte = this.inputStream.read();
+          if (firstByte == -1) {
+            throw new EOFException("First byte is negative");
+          }
+          size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
+          if (this.inputStream.available() < size) {
+            throw new EOFException("Available stream not enough for edit, " +
+                "inputStream.available()= " + this.inputStream.available() + ", " +
+                "entry size= " + size);
+          }
+          final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
+          builder.mergeFrom(limitedInput);
+        } catch (InvalidProtocolBufferException ipbe) {
+          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
+              originalPosition + ", currentPosition=" + this.inputStream.getPos() +
+              ", messageSize=" + size + ", originalAvailable=" + originalAvailable +
+              ", currentAvailable=" + this.inputStream.available()).initCause(ipbe);
+        }
+        if (!builder.isInitialized()) {
+          // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
+          //       If we can get the KV count, we could, theoretically, try to get next record.
+          throw new EOFException("Partial PB while reading WAL, " +
+              "probably an unexpected EOF, ignoring");
+        }
+        WALKey walKey = builder.build();
+        entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+        if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
+          LOG.trace("WALKey has no KVs that follow it; trying the next one");
+          continue;
         }
-      } catch (Exception ex) {
-        String posAfterStr = "<unknown>";
+        int expectedCells = walKey.getFollowingKvCount();
+        long posBefore = this.inputStream.getPos();
         try {
-          posAfterStr = this.inputStream.getPos() + "";
-        } catch (Throwable t) {
-           LOG.trace("Error getting pos for error message - ignoring", t);
+          int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+          if (expectedCells != actualCells) {
+            throw new EOFException("Only read " + actualCells); // other info added in catch
+          }
+        } catch (Exception ex) {
+          String posAfterStr = "<unknown>";
+          try {
+            posAfterStr = this.inputStream.getPos() + "";
+          } catch (Throwable t) {
+            LOG.trace("Error getting pos for error message - ignoring", t);
+          }
+          String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+              + posBefore + " and read up to " + posAfterStr;
+          IOException realEofEx = extractHiddenEof(ex);
+          throw (EOFException) new EOFException("EOF " + message).
+              initCause(realEofEx != null ? ex : realEofEx);
         }
-        String message = " while reading " + expectedCells + " WAL KVs; started reading at "
-            + posBefore + " and read up to " + posAfterStr;
-        IOException realEofEx = extractHiddenEof(ex);
-        if (realEofEx != null) {
-          LOG.error("EOF " + message, realEofEx);
-          return false;
+        if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
+          LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+              + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+              + this.walEditsStopOffset);
+          throw new EOFException("Read WALTrailer while reading WALEdits");
         }
-        message = "Error " + message;
-        LOG.error(message);
-        throw new IOException(message, ex);
-      }
-      if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
-        LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
-            + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
-            + this.walEditsStopOffset);
-        throw new IOException("Read WALTrailer while reading WALEdits");
+      } catch (EOFException eof) {
+        LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
+        seekOnFs(originalPosition);
+        return false;
       }
       return true;
     }