You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "virajjasani (via GitHub)" <gi...@apache.org> on 2023/02/26 19:15:30 UTC

[GitHub] [hbase] virajjasani commented on a diff in pull request #5059: HBASE-27668 PB's parseDelimitedFrom can successfully return when ther…

virajjasani commented on code in PR #5059:
URL: https://github.com/apache/hbase/pull/5059#discussion_r1118135225


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java:
##########
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
       .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
   }
 
+  /**
+   * Check whether this IPBE indicates EOF or not.
+   * <p/>
+   * We will check the exception message, if it is likely the one of
+   * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
+   */
+  public static boolean isEOF(InvalidProtocolBufferException e) {
+    return e.getMessage().contains("input has been truncated");

Review Comment:
   I wonder if PB provides some sort of constant for "input has been truncated" just so that we can use it. I am just worries about PB version compatibility.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java:
##########
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
       .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
   }
 
+  /**
+   * Check whether this IPBE indicates EOF or not.
+   * <p/>
+   * We will check the exception message, if it is likely the one of
+   * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
+   */
+  public static boolean isEOF(InvalidProtocolBufferException e) {
+    return e.getMessage().contains("input has been truncated");
+  }
+
+  /**
+   * This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
+   * determine whether there are enough bytes in stream, i.e, the available method does not have a
+   * valid return value, we will try to read all the bytes to a byte array first, and then parse the
+   * pb message with {@link Parser#parseFrom(byte[])} instead of call
+   * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
+   * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
+   * errors but just leave us a partial PB message.
+   * @return The PB message if we can parse it successfully, otherwise there will always be an
+   *         exception thrown, will never return {@code null}.
+   */
+  public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
+    throws IOException {
+    int firstByte = in.read();
+    if (firstByte < 0) {
+      throw new EOFException("EOF while reading message size");
+    }
+    int size = CodedInputStream.readRawVarint32(firstByte, in);
+    int available = in.available();
+    if (available > 0) {
+      if (available < size) {
+        throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+          + size + " bytes, but only " + available + " bytes available");
+      }
+      // this piece of code is copied from GeneratedMessageV3.parseFrom
+      try {
+        return parser.parseFrom(ByteStreams.limit(in, size));
+      } catch (InvalidProtocolBufferException e) {
+        throw e.unwrapIOException();

Review Comment:
   Shall we also log `InvalidProtocolBufferException` before throwing here?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java:
##########
@@ -358,60 +356,46 @@ protected Compression.Algorithm getValueCompressionAlgorithm() {
 
   @Override
   protected boolean readNext(Entry entry) throws IOException {
+    resetCompression = false;
     // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
     long originalPosition = this.inputStream.getPos();
     if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
       LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
       return false;
     }
-    WALKey.Builder builder = WALKey.newBuilder();
-    long size = 0;
     boolean resetPosition = false;
-    // by default, we should reset the compression when seeking back after reading something
-    resetCompression = true;
     try {
-      long available = -1;
+      WALKey walKey;
       try {
-        int firstByte = this.inputStream.read();
-        if (firstByte == -1) {
-          throw new EOFException();
-        }
-        size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
-        // available may be < 0 on local fs for instance. If so, can't depend on it.
-        available = this.inputStream.available();
-        if (available > 0 && available < size) {
-          // if we quit here, we have just read the length, no actual data yet, which means we
-          // haven't put anything into the compression dictionary yet, so when seeking back to the
-          // last good position, we do not need to reset compression context.
-          // This is very useful for saving the extra effort for reconstructing the compression
-          // dictionary, where we need to read from the beginning instead of just seek to the
-          // position, as DFSInputStream implement the available method, so in most cases we will
-          // reach here if there are not enough data.
-          resetCompression = false;
-          throw new EOFException("Available stream not enough for edit, "
-            + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
-            + size + " at offset = " + this.inputStream.getPos());
+        walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+      } catch (InvalidProtocolBufferException e) {
+        if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+          // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
+          resetPosition = true;
+          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+            + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
+        } else {
+          throw e;
         }
-        ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
-      } catch (InvalidProtocolBufferException ipbe) {
-        resetPosition = true;
-        throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
-          + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
-          + size + ", currentAvailable=" + available).initCause(ipbe);
+      } catch (EOFException e) {
+        // append more detailed information
+        throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+          + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
       }
-      if (!builder.isInitialized()) {
-        // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
-        // If we can get the KV count, we could, theoretically, try to get next record.
-        throw new EOFException("Partial PB while reading WAL, "
-          + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
-      }
-      WALKey walKey = builder.build();
       entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
       if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
         LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
           this.inputStream.getPos());
         return true;
       }
+      // Starting from here, we will start to read cells, which will change the content in
+      // compression dictionary, so if we fail in the below operations, when resetting, we also need
+      // to clear the compression context, and read from the beginning to reconstruct the
+      // compression dictionary, instead of seeking to the position directly.
+      // This is very useful for saving the extra effort for reconstructing the compression
+      // dictionary, as DFSInputStream implement the available method, so in most cases we will
+      // not reach here if there are not enough data.
+      resetCompression = true;

Review Comment:
   Interesting. So this was introduced by HBASE-27621 and looks like has not made it to live release yet, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org