You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/02/28 02:08:38 UTC

[hbase] branch branch-2.5 updated: HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new c319c6f0a31 HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
c319c6f0a31 is described below

commit c319c6f0a316446342fd9a03ba72d14e8dc7cb36
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Feb 28 09:47:37 2023 +0800

    HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    (cherry picked from commit d1fede72c340f6eeb145410e88b089e636fd6f5e)
---
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  50 ++++++++++
 .../hbase/shaded/protobuf/TestProtobufUtil.java    |  19 ++++
 .../hbase/regionserver/wal/ProtobufLogReader.java  | 110 +++++++++++++--------
 .../regionserver/wal/AbstractTestProtobufLog.java  |   5 +
 4 files changed, 145 insertions(+), 39 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1c1125f6aea..68377bde13b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf;
 import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
 
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Constructor;
@@ -133,6 +134,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@@ -3567,4 +3569,52 @@ public final class ProtobufUtil {
       .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
   }
 
+  /**
+   * Check whether this IPBE indicates EOF or not.
+   * <p/>
+   * We will check the exception message, if it is likely the one of
+   * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
+   */
+  public static boolean isEOF(InvalidProtocolBufferException e) {
+    return e.getMessage().contains("input has been truncated");
+  }
+
+  /**
+   * This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
+   * determine whether there are enough bytes in stream, i.e, the available method does not have a
+   * valid return value, we will try to read all the bytes to a byte array first, and then parse the
+   * pb message with {@link Parser#parseFrom(byte[])} instead of call
+   * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
+   * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
+   * errors but just leave us a partial PB message.
+   * @return The PB message if we can parse it successfully, otherwise there will always be an
+   *         exception thrown, will never return {@code null}.
+   */
+  public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
+    throws IOException {
+    int firstByte = in.read();
+    if (firstByte < 0) {
+      throw new EOFException("EOF while reading message size");
+    }
+    int size = CodedInputStream.readRawVarint32(firstByte, in);
+    int available = in.available();
+    if (available > 0) {
+      if (available < size) {
+        throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+          + size + " bytes, but only " + available + " bytes available");
+      }
+      // this piece of code is copied from GeneratedMessageV3.parseFrom
+      try {
+        return parser.parseFrom(ByteStreams.limit(in, size));
+      } catch (InvalidProtocolBufferException e) {
+        throw e.unwrapIOException();
+      }
+    } else {
+      // this usually means the stream does not have a proper available implementation, let's read
+      // the content to an byte array before parsing.
+      byte[] bytes = new byte[size];
+      ByteStreams.readFully(in, bytes);
+      return parser.parseFrom(bytes);
+    }
+  }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
index b27d832ee8c..fc442b8998d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
@@ -53,6 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.Any;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -574,4 +576,21 @@ public class TestProtobufUtil {
     List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
     assertEquals(0, decodedTags.size());
   }
+
+  /**
+   * Used to confirm that we only consider truncatedMessage as EOF
+   */
+  @Test
+  public void testIsEOF() throws Exception {
+    for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
+      if (
+        method.getParameterCount() == 0
+          && method.getReturnType() == InvalidProtocolBufferException.class
+      ) {
+        method.setAccessible(true);
+        InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
+        assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
+      }
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index d562fe705ce..a7ca1827845 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -36,8 +36,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
-import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -358,60 +356,46 @@ public class ProtobufLogReader extends ReaderBase {
 
   @Override
   protected boolean readNext(Entry entry) throws IOException {
+    resetCompression = false;
     // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
     long originalPosition = this.inputStream.getPos();
     if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
       LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
       return false;
     }
-    WALKey.Builder builder = WALKey.newBuilder();
-    long size = 0;
     boolean resetPosition = false;
-    // by default, we should reset the compression when seeking back after reading something
-    resetCompression = true;
     try {
-      long available = -1;
+      WALKey walKey;
       try {
-        int firstByte = this.inputStream.read();
-        if (firstByte == -1) {
-          throw new EOFException();
-        }
-        size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
-        // available may be < 0 on local fs for instance. If so, can't depend on it.
-        available = this.inputStream.available();
-        if (available > 0 && available < size) {
-          // if we quit here, we have just read the length, no actual data yet, which means we
-          // haven't put anything into the compression dictionary yet, so when seeking back to the
-          // last good position, we do not need to reset compression context.
-          // This is very useful for saving the extra effort for reconstructing the compression
-          // dictionary, where we need to read from the beginning instead of just seek to the
-          // position, as DFSInputStream implement the available method, so in most cases we will
-          // reach here if there are not enough data.
-          resetCompression = false;
-          throw new EOFException("Available stream not enough for edit, "
-            + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
-            + size + " at offset = " + this.inputStream.getPos());
+        walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+      } catch (InvalidProtocolBufferException e) {
+        if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+          // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
+          resetPosition = true;
+          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+            + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
+        } else {
+          throw e;
         }
-        ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
-      } catch (InvalidProtocolBufferException ipbe) {
-        resetPosition = true;
-        throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
-          + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
-          + size + ", currentAvailable=" + available).initCause(ipbe);
+      } catch (EOFException e) {
+        // append more detailed information
+        throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+          + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
       }
-      if (!builder.isInitialized()) {
-        // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
-        // If we can get the KV count, we could, theoretically, try to get next record.
-        throw new EOFException("Partial PB while reading WAL, "
-          + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
-      }
-      WALKey walKey = builder.build();
       entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
       if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
         LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
           this.inputStream.getPos());
         return true;
       }
+      // Starting from here, we will start to read cells, which will change the content in
+      // compression dictionary, so if we fail in the below operations, when resetting, we also need
+      // to clear the compression context, and read from the beginning to reconstruct the
+      // compression dictionary, instead of seeking to the position directly.
+      // This is very useful for saving the extra effort for reconstructing the compression
+      // dictionary, as DFSInputStream implement the available method, so in most cases we will
+      // not reach here if there are not enough data.
+      resetCompression = true;
       int expectedCells = walKey.getFollowingKvCount();
       long posBefore = this.inputStream.getPos();
       try {
@@ -490,6 +474,54 @@ public class ProtobufLogReader extends ReaderBase {
     return null;
   }
 
+  /**
+   * This is used to determine whether we have already reached the WALTrailer. As the size and magic
+   * are at the end of the WAL file, it is possible that these two options are missing while
+   * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
+   * will try to decode it as WALKey and we will fail but the error could vary as it is parsing
+   * WALTrailer actually.
+   * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
+   */
+  private boolean isWALTrailer(long startPosition) throws IOException {
+    // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
+    // magic at the end
+    int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
+    if (fileLength - startPosition >= trailerSize) {
+      // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
+      // We also test for == here because if this is a valid trailer, we can read it while opening
+      // the reader so we should not reach here
+      return false;
+    }
+    inputStream.seek(startPosition);
+    for (int i = 0; i < 4; i++) {
+      int r = inputStream.read();
+      if (r == -1) {
+        // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
+        // is a partial trailer
+        return true;
+      }
+      if (r != 0) {
+        // the length is not 0, should not be a trailer
+        return false;
+      }
+    }
+    for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
+      int r = inputStream.read();
+      if (r == -1) {
+        // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
+        // this is a partial trailer
+        return true;
+      }
+      if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
+        // does not match magic, should not be a trailer
+        return false;
+      }
+    }
+    // in fact we should not reach here, as this means the trailer bytes are all matched and
+    // complete, then we should not call this method...
+    return true;
+  }
+
   @Override
   protected void seekOnFs(long pos) throws IOException {
     this.inputStream.seek(pos);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index a6d82323f8b..6a4800fb1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -53,6 +53,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+
 /**
  * WAL tests that can be reused across providers.
  */
@@ -111,6 +113,9 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
    */
   @Test
   public void testWALTrailer() throws IOException {
+    // make sure that the size for WALTrailer is 0, we need this assumption when reading partial
+    // WALTrailer
+    assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
     // read With trailer.
     doRead(true);
     // read without trailer