You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/05/09 03:03:02 UTC

svn commit: r1480511 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/

Author: sershe
Date: Thu May  9 01:03:02 2013
New Revision: 1480511

URL: http://svn.apache.org/r1480511
Log:
HBASE-8498 PB WAL reading is broken due to some partial reads

Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Thu May  9 01:03:02 2013
@@ -20,7 +20,9 @@
 package org.apache.hadoop.hbase;
 
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -2330,9 +2333,16 @@ public class KeyValue implements Cell, H
    */
   public static KeyValue iscreate(final InputStream in) throws IOException {
     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
-    int length = in.read(intBytes);
-    if (length == 0) return null;
-    if (length != intBytes.length) throw new IOException("Failed read of int length " + length);
+    int bytesRead = 0;
+    while (bytesRead < intBytes.length) {
+      int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
+      if (n < 0) {
+        if (bytesRead == 0) return null; // EOF at start is ok
+        throw new IOException("Failed read of int, read " + bytesRead + " bytes");
+      }
+      bytesRead += n;
+    }
+    // TODO: perhaps some sanity check is needed here.
     byte [] bytes = new byte[Bytes.toInt(intBytes)];
     IOUtils.readFully(in, bytes, 0, bytes.length);
     return new KeyValue(bytes, 0, bytes.length);

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java Thu May  9 01:03:02 2013
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hbase.codec;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 
 public abstract class BaseDecoder implements Codec.Decoder {
+  protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
   protected final InputStream in;
   private boolean hasNext = true;
   private Cell current = null;
@@ -34,14 +38,32 @@ public abstract class BaseDecoder implem
   @Override
   public boolean advance() throws IOException {
     if (!this.hasNext) return this.hasNext;
-    if (this.in.available() <= 0) {
+    if (this.in.available() == 0) {
       this.hasNext = false;
       return this.hasNext;
     }
-    this.current = parseCell();
+    try {
+      this.current = parseCell();
+    } catch (IOException ioEx) {
+      rethrowEofException(ioEx);
+    }
     return this.hasNext;
   }
 
+  private void rethrowEofException(IOException ioEx) throws IOException {
+    boolean isEof = false;
+    try {
+      isEof = this.in.available() == 0;
+    } catch (Throwable t) {
+      LOG.trace("Error getting available for error message - ignoring", t);
+    }
+    if (!isEof) throw ioEx;
+    LOG.error("Partial cell read caused by EOF: " + ioEx);
+    EOFException eofEx = new EOFException("Partial cell read");
+    eofEx.initCause(ioEx);
+    throw eofEx;
+  }
+
   /**
    * @return extract a Cell
    * @throws IOException
@@ -52,4 +74,4 @@ public abstract class BaseDecoder implem
   public Cell current() {
     return this.current;
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java Thu May  9 01:03:02 2013
@@ -116,44 +116,63 @@ public class ProtobufLogReader extends R
 
   @Override
   protected boolean readNext(HLog.Entry entry) throws IOException {
-    WALKey.Builder builder = WALKey.newBuilder();
-    boolean hasNext = false;
-    try {
-      hasNext = builder.mergeDelimitedFrom(inputStream);
-    } catch (InvalidProtocolBufferException ipbe) {
-      LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
-    }
-    if (!hasNext) return false;
-    if (!builder.isInitialized()) {
-      // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
-      //       If we can get the KV count, we could, theoretically, try to get next record.
-      LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
-      return false;
-    }
-    WALKey walKey = builder.build();
-    entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
-    try {
+    while (true) {
+      WALKey.Builder builder = WALKey.newBuilder();
+      boolean hasNext = false;
+      try {
+        hasNext = builder.mergeDelimitedFrom(inputStream);
+      } catch (InvalidProtocolBufferException ipbe) {
+        LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
+      }
+      if (!hasNext) return false;
+      if (!builder.isInitialized()) {
+        // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
+        //       If we can get the KV count, we could, theoretically, try to get next record.
+        LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
+        return false;
+      }
+      WALKey walKey = builder.build();
+      entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+      if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
+        LOG.warn("WALKey has no KVs that follow it; trying the next one");
+        continue;
+      }
       int expectedCells = walKey.getFollowingKvCount();
-      int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
-      if (expectedCells != actualCells) {
-        throw new EOFException("Unable to read " + expectedCells + " cells, got " + actualCells);
+      long posBefore = this.inputStream.getPos();
+      try {
+        int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+        if (expectedCells != actualCells) {
+          throw new EOFException("Only read " + actualCells); // other info added in catch
+        }
+      } catch (Exception ex) {
+        String posAfterStr = "<unknown>";
+        try {
+          posAfterStr = this.inputStream.getPos() + "";
+        } catch (Throwable t) {
+           LOG.trace("Error getting pos for error message - ignoring", t);
+        }
+        String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+            + posBefore + " and read up to " + posAfterStr;
+        IOException realEofEx = extractHiddenEof(ex);
+        if (realEofEx != null) {
+          LOG.error("EOF " + message, realEofEx);
+          return false;
+        }
+        message = "Error " + message;
+        LOG.error(message);
+        throw new IOException(message, ex);
       }
-    } catch (EOFException ex) {
-      LOG.error("EOF while reading KVs, ignoring", ex);
-      return false;
-    } catch (Exception ex) {
-      IOException realEofEx = extractHiddenEofOrRethrow(ex);
-      LOG.error("EOF while reading KVs, ignoring", realEofEx);
-      return false;
+      return true;
     }
-    return true;
   }
 
-  private IOException extractHiddenEofOrRethrow(Exception ex) throws IOException {
+  private IOException extractHiddenEof(Exception ex) {
     // There are two problems we are dealing with here. Hadoop stream throws generic exception
     // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
     IOException ioEx = null;
-    if (ex instanceof IOException) {
+    if (ex instanceof EOFException) {
+      return (EOFException)ex;
+    } else if (ex instanceof IOException) {
       ioEx = (IOException)ex;
     } else if (ex instanceof RuntimeException
         && ex.getCause() != null && ex.getCause() instanceof IOException) {
@@ -161,9 +180,9 @@ public class ProtobufLogReader extends R
     }
     if (ioEx != null) {
       if (ioEx.getMessage().contains("EOF")) return ioEx;
-      throw ioEx;
+      return null;
     }
-    throw new IOException(ex);
+    return null;
   }
 
   @Override