You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/05/09 03:03:02 UTC
svn commit: r1480511 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/codec/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/
Author: sershe
Date: Thu May 9 01:03:02 2013
New Revision: 1480511
URL: http://svn.apache.org/r1480511
Log:
HBASE-8498 PB WAL reading is broken due to some partial reads
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Thu May 9 01:03:02 2013
@@ -20,7 +20,9 @@
package org.apache.hadoop.hbase;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -2330,9 +2333,16 @@ public class KeyValue implements Cell, H
*/
public static KeyValue iscreate(final InputStream in) throws IOException {
byte [] intBytes = new byte[Bytes.SIZEOF_INT];
- int length = in.read(intBytes);
- if (length == 0) return null;
- if (length != intBytes.length) throw new IOException("Failed read of int length " + length);
+ int bytesRead = 0;
+ while (bytesRead < intBytes.length) {
+ int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
+ if (n < 0) {
+ if (bytesRead == 0) return null; // EOF at start is ok
+ throw new IOException("Failed read of int, read " + bytesRead + " bytes");
+ }
+ bytesRead += n;
+ }
+ // TODO: perhaps some sanity check is needed here.
byte [] bytes = new byte[Bytes.toInt(intBytes)];
IOUtils.readFully(in, bytes, 0, bytes.length);
return new KeyValue(bytes, 0, bytes.length);
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java Thu May 9 01:03:02 2013
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.codec;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
public abstract class BaseDecoder implements Codec.Decoder {
+ protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
protected final InputStream in;
private boolean hasNext = true;
private Cell current = null;
@@ -34,14 +38,32 @@ public abstract class BaseDecoder implem
@Override
public boolean advance() throws IOException {
if (!this.hasNext) return this.hasNext;
- if (this.in.available() <= 0) {
+ if (this.in.available() == 0) {
this.hasNext = false;
return this.hasNext;
}
- this.current = parseCell();
+ try {
+ this.current = parseCell();
+ } catch (IOException ioEx) {
+ rethrowEofException(ioEx);
+ }
return this.hasNext;
}
+ private void rethrowEofException(IOException ioEx) throws IOException {
+ boolean isEof = false;
+ try {
+ isEof = this.in.available() == 0;
+ } catch (Throwable t) {
+ LOG.trace("Error getting available for error message - ignoring", t);
+ }
+ if (!isEof) throw ioEx;
+ LOG.error("Partial cell read caused by EOF: " + ioEx);
+ EOFException eofEx = new EOFException("Partial cell read");
+ eofEx.initCause(ioEx);
+ throw eofEx;
+ }
+
/**
* @return extract a Cell
* @throws IOException
@@ -52,4 +74,4 @@ public abstract class BaseDecoder implem
public Cell current() {
return this.current;
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java?rev=1480511&r1=1480510&r2=1480511&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java Thu May 9 01:03:02 2013
@@ -116,44 +116,63 @@ public class ProtobufLogReader extends R
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
- WALKey.Builder builder = WALKey.newBuilder();
- boolean hasNext = false;
- try {
- hasNext = builder.mergeDelimitedFrom(inputStream);
- } catch (InvalidProtocolBufferException ipbe) {
- LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
- }
- if (!hasNext) return false;
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get next record.
- LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
- return false;
- }
- WALKey walKey = builder.build();
- entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
- try {
+ while (true) {
+ WALKey.Builder builder = WALKey.newBuilder();
+ boolean hasNext = false;
+ try {
+ hasNext = builder.mergeDelimitedFrom(inputStream);
+ } catch (InvalidProtocolBufferException ipbe) {
+ LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
+ }
+ if (!hasNext) return false;
+ if (!builder.isInitialized()) {
+ // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
+ // If we can get the KV count, we could, theoretically, try to get next record.
+ LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
+ return false;
+ }
+ WALKey walKey = builder.build();
+ entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+ if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
+ LOG.warn("WALKey has no KVs that follow it; trying the next one");
+ continue;
+ }
int expectedCells = walKey.getFollowingKvCount();
- int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
- if (expectedCells != actualCells) {
- throw new EOFException("Unable to read " + expectedCells + " cells, got " + actualCells);
+ long posBefore = this.inputStream.getPos();
+ try {
+ int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+ if (expectedCells != actualCells) {
+ throw new EOFException("Only read " + actualCells); // other info added in catch
+ }
+ } catch (Exception ex) {
+ String posAfterStr = "<unknown>";
+ try {
+ posAfterStr = this.inputStream.getPos() + "";
+ } catch (Throwable t) {
+ LOG.trace("Error getting pos for error message - ignoring", t);
+ }
+ String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ + posBefore + " and read up to " + posAfterStr;
+ IOException realEofEx = extractHiddenEof(ex);
+ if (realEofEx != null) {
+ LOG.error("EOF " + message, realEofEx);
+ return false;
+ }
+ message = "Error " + message;
+ LOG.error(message);
+ throw new IOException(message, ex);
}
- } catch (EOFException ex) {
- LOG.error("EOF while reading KVs, ignoring", ex);
- return false;
- } catch (Exception ex) {
- IOException realEofEx = extractHiddenEofOrRethrow(ex);
- LOG.error("EOF while reading KVs, ignoring", realEofEx);
- return false;
+ return true;
}
- return true;
}
- private IOException extractHiddenEofOrRethrow(Exception ex) throws IOException {
+ private IOException extractHiddenEof(Exception ex) {
// There are two problems we are dealing with here. Hadoop stream throws generic exception
// for EOF, not EOFException; and scanner further hides it inside RuntimeException.
IOException ioEx = null;
- if (ex instanceof IOException) {
+ if (ex instanceof EOFException) {
+ return (EOFException)ex;
+ } else if (ex instanceof IOException) {
ioEx = (IOException)ex;
} else if (ex instanceof RuntimeException
&& ex.getCause() != null && ex.getCause() instanceof IOException) {
@@ -161,9 +180,9 @@ public class ProtobufLogReader extends R
}
if (ioEx != null) {
if (ioEx.getMessage().contains("EOF")) return ioEx;
- throw ioEx;
+ return null;
}
- throw new IOException(ex);
+ return null;
}
@Override