You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/02/28 02:08:38 UTC
[hbase] branch branch-2.5 updated: HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new c319c6f0a31 HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
c319c6f0a31 is described below
commit c319c6f0a316446342fd9a03ba72d14e8dc7cb36
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Feb 28 09:47:37 2023 +0800
HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
Signed-off-by: Viraj Jasani <vj...@apache.org>
(cherry picked from commit d1fede72c340f6eeb145410e88b089e636fd6f5e)
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 50 ++++++++++
.../hbase/shaded/protobuf/TestProtobufUtil.java | 19 ++++
.../hbase/regionserver/wal/ProtobufLogReader.java | 110 +++++++++++++--------
.../regionserver/wal/AbstractTestProtobufLog.java | 5 +
4 files changed, 145 insertions(+), 39 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1c1125f6aea..68377bde13b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf;
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
@@ -133,6 +134,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@@ -3567,4 +3569,52 @@ public final class ProtobufUtil {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}
+ /**
+ * Check whether this IPBE indicates EOF or not.
+ * <p/>
+ * We will check the exception message, if it is likely the one of
+ * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
+ */
+ public static boolean isEOF(InvalidProtocolBufferException e) {
+ return e.getMessage().contains("input has been truncated");
+ }
+
+ /**
+ * This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
+ * determine whether there are enough bytes in stream, i.e, the available method does not have a
+ * valid return value, we will try to read all the bytes to a byte array first, and then parse the
+ * pb message with {@link Parser#parseFrom(byte[])} instead of call
+ * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
+ * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
+ * errors but just leave us a partial PB message.
+ * @return The PB message if we can parse it successfully, otherwise there will always be an
+ * exception thrown, will never return {@code null}.
+ */
+ public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
+ throws IOException {
+ int firstByte = in.read();
+ if (firstByte < 0) {
+ throw new EOFException("EOF while reading message size");
+ }
+ int size = CodedInputStream.readRawVarint32(firstByte, in);
+ int available = in.available();
+ if (available > 0) {
+ if (available < size) {
+ throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+ + size + " bytes, but only " + available + " bytes available");
+ }
+ // this piece of code is copied from GeneratedMessageV3.parseFrom
+ try {
+ return parser.parseFrom(ByteStreams.limit(in, size));
+ } catch (InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
+ }
+ } else {
+ // this usually means the stream does not have a proper available implementation, let's read
+ // the content to an byte array before parsing.
+ byte[] bytes = new byte[size];
+ ByteStreams.readFully(in, bytes);
+ return parser.parseFrom(bytes);
+ }
+ }
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
index b27d832ee8c..fc442b8998d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -53,6 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -574,4 +576,21 @@ public class TestProtobufUtil {
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
assertEquals(0, decodedTags.size());
}
+
+ /**
+ * Used to confirm that we only consider truncatedMessage as EOF
+ */
+ @Test
+ public void testIsEOF() throws Exception {
+ for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
+ if (
+ method.getParameterCount() == 0
+ && method.getReturnType() == InvalidProtocolBufferException.class
+ ) {
+ method.setAccessible(true);
+ InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
+ assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index d562fe705ce..a7ca1827845 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -36,8 +36,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
-import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -358,60 +356,46 @@ public class ProtobufLogReader extends ReaderBase {
@Override
protected boolean readNext(Entry entry) throws IOException {
+ resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
- WALKey.Builder builder = WALKey.newBuilder();
- long size = 0;
boolean resetPosition = false;
- // by default, we should reset the compression when seeking back after reading something
- resetCompression = true;
try {
- long available = -1;
+ WALKey walKey;
try {
- int firstByte = this.inputStream.read();
- if (firstByte == -1) {
- throw new EOFException();
- }
- size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
- // available may be < 0 on local fs for instance. If so, can't depend on it.
- available = this.inputStream.available();
- if (available > 0 && available < size) {
- // if we quit here, we have just read the length, no actual data yet, which means we
- // haven't put anything into the compression dictionary yet, so when seeking back to the
- // last good position, we do not need to reset compression context.
- // This is very useful for saving the extra effort for reconstructing the compression
- // dictionary, where we need to read from the beginning instead of just seek to the
- // position, as DFSInputStream implement the available method, so in most cases we will
- // reach here if there are not enough data.
- resetCompression = false;
- throw new EOFException("Available stream not enough for edit, "
- + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
- + size + " at offset = " + this.inputStream.getPos());
+ walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+ // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
+ resetPosition = true;
+ throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
+ } else {
+ throw e;
}
- ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
- } catch (InvalidProtocolBufferException ipbe) {
- resetPosition = true;
- throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
- + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
- + size + ", currentAvailable=" + available).initCause(ipbe);
+ } catch (EOFException e) {
+ // append more detailed information
+ throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+ + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
}
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get next record.
- throw new EOFException("Partial PB while reading WAL, "
- + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
- }
- WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
return true;
}
+ // Starting from here, we will start to read cells, which will change the content in
+ // compression dictionary, so if we fail in the below operations, when resetting, we also need
+ // to clear the compression context, and read from the beginning to reconstruct the
+ // compression dictionary, instead of seeking to the position directly.
+ // This is very useful for saving the extra effort for reconstructing the compression
+ // dictionary, as DFSInputStream implement the available method, so in most cases we will
+ // not reach here if there are not enough data.
+ resetCompression = true;
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
@@ -490,6 +474,54 @@ public class ProtobufLogReader extends ReaderBase {
return null;
}
+ /**
+ * This is used to determine whether we have already reached the WALTrailer. As the size and magic
+ * are at the end of the WAL file, it is possible that these two options are missing while
+ * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
+ * will try to decode it as WALKey and we will fail but the error could vary as it is parsing
+ * WALTrailer actually.
+ * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
+ */
+ private boolean isWALTrailer(long startPosition) throws IOException {
+ // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
+ // magic at the end
+ int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
+ if (fileLength - startPosition >= trailerSize) {
+ // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
+ // We also test for == here because if this is a valid trailer, we can read it while opening
+ // the reader so we should not reach here
+ return false;
+ }
+ inputStream.seek(startPosition);
+ for (int i = 0; i < 4; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
+ // is a partial trailer
+ return true;
+ }
+ if (r != 0) {
+ // the length is not 0, should not be a trailer
+ return false;
+ }
+ }
+ for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
+ // this is a partial trailer
+ return true;
+ }
+ if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
+ // does not match magic, should not be a trailer
+ return false;
+ }
+ }
+ // in fact we should not reach here, as this means the trailer bytes are all matched and
+ // complete, then we should not call this method...
+ return true;
+ }
+
@Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index a6d82323f8b..6a4800fb1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -53,6 +53,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+
/**
* WAL tests that can be reused across providers.
*/
@@ -111,6 +113,9 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
*/
@Test
public void testWALTrailer() throws IOException {
+ // make sure that the size for WALTrailer is 0, we need this assumption when reading partial
+ // WALTrailer
+ assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
// read With trailer.
doRead(true);
// read without trailer