You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2019/05/02 09:42:50 UTC

[hbase] branch branch-2.0 updated: HBASE-22340 Corrupt KeyValue is silently ignored (#207)

This is an automated email from the ASF dual-hosted git repository.

psomogyi pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5905051  HBASE-22340 Corrupt KeyValue is silently ignored (#207)
5905051 is described below

commit 59050518288fc91fe3b4a88fbc1861042287996e
Author: Peter Somogyi <ps...@apache.org>
AuthorDate: Thu May 2 11:17:28 2019 +0200

    HBASE-22340 Corrupt KeyValue is silently ignored (#207)
---
 .../java/org/apache/hadoop/hbase/KeyValueUtil.java | 165 ++++++++++++++-------
 .../hbase/regionserver/wal/ProtobufLogReader.java  |  34 ++---
 2 files changed, 120 insertions(+), 79 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index 16ebdbf..d1e5c75 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Function;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -46,6 +48,8 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils
 @InterfaceAudience.Private
 public class KeyValueUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(KeyValueUtil.class);
+
   /**************** length *********************/
 
   /**
@@ -524,87 +528,120 @@ public class KeyValueUtil {
   }
 
   static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
+    if (buf == null) {
+      String msg = "Invalid to have null byte array in KeyValue.";
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
     int pos = offset, endOffset = offset + length;
     // check the key
     if (pos + Bytes.SIZEOF_INT > endOffset) {
-      throw new IllegalArgumentException(
-          "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
     pos += Bytes.SIZEOF_INT;
     if (keyLen <= 0 || pos + keyLen > endOffset) {
-      throw new IllegalArgumentException(
-          "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length));
+      String msg =
+          "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     // check the value
     if (pos + Bytes.SIZEOF_INT > endOffset) {
-      throw new IllegalArgumentException("Overflow when reading value length at position=" + pos
-          + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
     pos += Bytes.SIZEOF_INT;
     if (valLen < 0 || pos + valLen > endOffset) {
-      throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen
-          + bytesToHex(buf, offset, length));
+      String msg = "Invalid value length in KeyValue, valueLength=" + valLen +
+          bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     // check the row
     if (pos + Bytes.SIZEOF_SHORT > endOffset) {
-      throw new IllegalArgumentException(
-          "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
     pos += Bytes.SIZEOF_SHORT;
     if (rowLen < 0 || pos + rowLen > endOffset) {
-      throw new IllegalArgumentException(
-          "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length));
+      String msg =
+          "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += rowLen;
     // check the family
     if (pos + Bytes.SIZEOF_BYTE > endOffset) {
-      throw new IllegalArgumentException("Overflow when reading family length at position=" + pos
-          + bytesToHex(buf, offset, length));
+      String msg = "Overflow when reading family length at position=" + pos +
+          bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     int familyLen = buf[pos];
     pos += Bytes.SIZEOF_BYTE;
     if (familyLen < 0 || pos + familyLen > endOffset) {
-      throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength="
-          + familyLen + bytesToHex(buf, offset, length));
+      String msg = "Invalid family length in KeyValue, familyLength=" + familyLen +
+          bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += familyLen;
     // check the qualifier
     int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
         - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
     if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
-      throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen="
-          + qualifierLen + bytesToHex(buf, offset, length));
+      String msg = "Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen +
+              bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += qualifierLen;
     // check the timestamp
     if (pos + Bytes.SIZEOF_LONG > endOffset) {
-      throw new IllegalArgumentException(
-          "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
     if (timestamp < 0) {
-      throw new IllegalArgumentException(
-          "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length));
+      String msg =
+          "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += Bytes.SIZEOF_LONG;
     // check the type
     if (pos + Bytes.SIZEOF_BYTE > endOffset) {
-      throw new IllegalArgumentException(
-          "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     byte type = buf[pos];
     if (!Type.isValidType(type)) {
-      throw new IllegalArgumentException(
-          "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length));
+      String msg = "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += Bytes.SIZEOF_BYTE;
     // check the value
     if (pos + valLen > endOffset) {
-      throw new IllegalArgumentException(
-          "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length));
+      String msg =
+          "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
     pos += valLen;
     // check the tags
@@ -613,39 +650,55 @@ public class KeyValueUtil {
         // withTags is true but no tag in the cell.
         return;
       }
-      if (pos + Bytes.SIZEOF_SHORT > endOffset) {
-        throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos
-            + bytesToHex(buf, offset, length));
-      }
-      short tagsLen = Bytes.toShort(buf, pos);
-      pos += Bytes.SIZEOF_SHORT;
-      if (tagsLen < 0 || pos + tagsLen > endOffset) {
-        throw new IllegalArgumentException("Invalid tags length in KeyValue at position="
-            + (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length));
-      }
-      int tagsEndOffset = pos + tagsLen;
-      for (; pos < tagsEndOffset;) {
-        if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
-          throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos
-              + bytesToHex(buf, offset, length));
-        }
-        short tagLen = Bytes.toShort(buf, pos);
-        pos += Tag.TAG_LENGTH_SIZE;
-        // tagLen contains one byte tag type, so must be not less than 1.
-        if (tagLen < 1 || pos + tagLen > endOffset) {
-          throw new IllegalArgumentException(
-              "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
-                  + tagLen + bytesToHex(buf, offset, length));
-        }
-        pos += tagLen;
-      }
+      pos = checkKeyValueTagBytes(buf, offset, length, pos, endOffset);
     }
     if (pos != endOffset) {
-      throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset="
-          + pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length));
+      String msg = "Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset="
+          + endOffset + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
     }
   }
 
+  private static int checkKeyValueTagBytes(byte[] buf, int offset, int length, int pos,
+      int endOffset) {
+    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
+      String msg = "Overflow when reading tags length at position=" + pos +
+          bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
+    }
+    short tagsLen = Bytes.toShort(buf, pos);
+    pos += Bytes.SIZEOF_SHORT;
+    if (tagsLen < 0 || pos + tagsLen > endOffset) {
+      String msg = "Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT)
+          + bytesToHex(buf, offset, length);
+      LOG.warn(msg);
+      throw new IllegalArgumentException(msg);
+    }
+    int tagsEndOffset = pos + tagsLen;
+    for (; pos < tagsEndOffset;) {
+      if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
+        String msg = "Overflow when reading tag length at position=" + pos +
+            bytesToHex(buf, offset, length);
+        LOG.warn(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      short tagLen = Bytes.toShort(buf, pos);
+      pos += Tag.TAG_LENGTH_SIZE;
+      // tagLen contains one byte tag type, so must be not less than 1.
+      if (tagLen < 1 || pos + tagLen > endOffset) {
+        String msg =
+            "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
+                + tagLen + bytesToHex(buf, offset, length);
+        LOG.warn(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      pos += tagLen;
+    }
+    return pos;
+  }
+
   /**
    * Create a KeyValue reading from the raw InputStream. Named
    * <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 545bc21..21da2bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -331,9 +331,7 @@ public class ProtobufLogReader extends ReaderBase {
       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
       long originalPosition = this.inputStream.getPos();
       if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Reached end of expected edits area at offset " + originalPosition);
-        }
+        LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
         return false;
       }
       WALKey.Builder builder = WALKey.newBuilder();
@@ -371,10 +369,8 @@ public class ProtobufLogReader extends ReaderBase {
         WALKey walKey = builder.build();
         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
-                this.inputStream.getPos());
-          }
+          LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
+              this.inputStream.getPos());
           seekOnFs(originalPosition);
           return false;
         }
@@ -391,9 +387,7 @@ public class ProtobufLogReader extends ReaderBase {
           try {
             posAfterStr = this.inputStream.getPos() + "";
           } catch (Throwable t) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Error getting pos for error message - ignoring", t);
-            }
+            LOG.trace("Error getting pos for error message - ignoring", t);
           }
           String message = " while reading " + expectedCells + " WAL KVs; started reading at "
               + posBefore + " and read up to " + posAfterStr;
@@ -410,27 +404,21 @@ public class ProtobufLogReader extends ReaderBase {
       } catch (EOFException eof) {
         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
         if (originalPosition < 0) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
-                + "because originalPosition is negative. last offset="
-                + this.inputStream.getPos(), eof);
-          }
+          LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
+              + "because originalPosition is negative. last offset={}",
+              this.inputStream.getPos(), eof);
           throw eof;
         }
         // If stuck at the same place and we got and exception, lets go back at the beginning.
         if (inputStream.getPos() == originalPosition && resetPosition) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
-                + "current position and original position match at " + originalPosition);
-          }
+          LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
+              + "current position and original position match at {}", originalPosition);
           seekOnFs(0);
         } else {
           // Else restore our position to original location in hope that next time through we will
           // read successfully.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
-                + "from " + inputStream.getPos()+" to " + originalPosition, eof);
-          }
+          LOG.warn("Encountered a malformed edit, seeking back to last good position in file, "
+              + "from {} to {}", inputStream.getPos(), originalPosition, eof);
           seekOnFs(originalPosition);
         }
         return false;