You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/04/17 19:08:56 UTC
[hbase] branch branch-2 updated: HBASE-23833. The relocated
hadoop-thirdparty protobuf breaks HBase asyncwal (#1301) (#1534)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 98ecd58 HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal (#1301) (#1534)
98ecd58 is described below
commit 98ecd584c4992aec9d3611110861cfc60d4bdc58
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Fri Apr 17 12:08:02 2020 -0700
HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal (#1301) (#1534)
* Use Reflection to access shaded Hadoop protobuf classes.
(cherry picked from commit a321e536989083ca3620bf2c53f12c07740bf5b0)
* Update to improve the code:
1. Added license.
2. Added more comments.
3. Wrap byte array instead of copy to make a ByteString.
4. Moved all reflection instantiation to static class loading time.
* Use LiteralByteString to wrap byte array instead of copying it.
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: stack <st...@apache.org>
(cherry picked from commit 72727ff9be9bc626522247aad9155938656fbfdb)
---
.../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 1 -
.../FanOutOneBlockAsyncDFSOutputHelper.java | 1 -
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 87 +++++++++++-
.../hadoop/hbase/io/asyncfs/ProtobufDecoder.java | 148 +++++++++++++++++++++
4 files changed, 230 insertions(+), 7 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 7a429c2..ed9da5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -69,7 +69,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 32cdfcf..e372726 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -99,7 +99,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index 59215de..090b9b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
-import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -93,7 +93,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
@@ -355,15 +354,93 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
}
+ /**
+ * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty.
+ * After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
+ * Use Reflection to check which ones to use.
+ */
+ private static class BuilderPayloadSetter {
+ private static Method setPayloadMethod;
+ private static Constructor<?> constructor;
+
+ /**
+ * Create a ByteString from byte array without copying (wrap), and then set it as the payload
+ * for the builder.
+ *
+ * @param builder builder for HDFS DataTransferEncryptorMessage.
+ * @param payload byte array of payload.
+ * @throws IOException
+ */
+ static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload)
+ throws IOException {
+ Object byteStringObject;
+ try {
+ // byteStringObject = new LiteralByteString(payload);
+ byteStringObject = constructor.newInstance(payload);
+ // builder.setPayload(byteStringObject);
+ setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
+ } catch (IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException(e);
+
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ }
+ }
+
+ static {
+ Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
+
+ // Try the unrelocated ByteString
+ Class<?> byteStringClass = com.google.protobuf.ByteString.class;
+ try {
+ // See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
+ byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
+ LOG.debug("Found relocated ByteString class from hadoop-thirdparty." +
+ " Assuming this is Hadoop 3.3.0+.");
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." +
+ " Assuming this is below Hadoop 3.3.0", e);
+ }
+
+ // LiteralByteString is a package private class in protobuf. Make it accessible.
+ Class<?> literalByteStringClass;
+ try {
+ literalByteStringClass = Class.forName(
+ "org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
+ LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
+ } catch (ClassNotFoundException e) {
+ try {
+ literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
+ LOG.debug("com.google.protobuf.LiteralByteString found.");
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ try {
+ constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
+ constructor.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
+ } catch (NoSuchMethodException e) {
+ // if either method is not found, we are in big trouble. Abort.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
List<CipherOption> options) throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
if (payload != null) {
- // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
- // and we want to keep that out of hbase-server.
- builder.setPayload(ByteString.copyFrom(payload));
+ BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
}
if (options != null) {
builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java
new file mode 100644
index 0000000..98b4e6f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder.
+ * The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf).
+ *
+ * Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and
+ * so we must use reflection to detect which one (relocated or not) to use.
+ *
+ * Do not use this to process HBase's shaded protobuf messages. This is meant to process the
+ * protobuf messages in HDFS for the asyncfs use case.
+ * */
+@InterfaceAudience.Private
+public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ProtobufDecoder.class);
+
+ private static Class<?> protobufMessageLiteClass = null;
+ private static Class<?> protobufMessageLiteBuilderClass = null;
+
+ private static final boolean HAS_PARSER;
+
+ private static Method getParserForTypeMethod;
+ private static Method newBuilderForTypeMethod;
+
+ private Method parseFromMethod;
+ private Method mergeFromMethod;
+ private Method buildMethod;
+
+ private Object parser;
+ private Object builder;
+
+
+ public ProtobufDecoder(Object prototype) {
+ try {
+ Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod(
+ "getDefaultInstanceForType");
+ Object prototype1 = getDefaultInstanceForTypeMethod
+ .invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
+
+ // parser = prototype.getParserForType()
+ parser = getParserForTypeMethod.invoke(prototype1);
+ parseFromMethod = parser.getClass().getMethod(
+ "parseFrom", byte[].class, int.class, int.class);
+
+ // builder = prototype.newBuilderForType();
+ builder = newBuilderForTypeMethod.invoke(prototype1);
+ mergeFromMethod = builder.getClass().getMethod(
+ "mergeFrom", byte[].class, int.class, int.class);
+
+ // All protobuf message builders inherits from MessageLite.Builder
+ buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
+
+ } catch (IllegalAccessException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e.getTargetException());
+ }
+ }
+
+ protected void decode(
+ ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+ int length = msg.readableBytes();
+ byte[] array;
+ int offset;
+ if (msg.hasArray()) {
+ array = msg.array();
+ offset = msg.arrayOffset() + msg.readerIndex();
+ } else {
+ array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
+ offset = 0;
+ }
+
+ Object addObj;
+ if (HAS_PARSER) {
+ // addObj = parser.parseFrom(array, offset, length);
+ addObj = parseFromMethod.invoke(parser, array, offset, length);
+ } else {
+ // addObj = builder.mergeFrom(array, offset, length).build();
+ Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
+ addObj = buildMethod.invoke(builderObj);
+ }
+ out.add(addObj);
+ }
+
+ static {
+ boolean hasParser = false;
+
+ // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
+ protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
+ protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
+
+ try {
+ protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
+ protobufMessageLiteBuilderClass = Class.forName(
+ "org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder");
+ LOG.debug("Hadoop 3.3 and above shades protobuf.");
+ } catch (ClassNotFoundException e) {
+ LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
+ }
+
+ try {
+ getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
+ newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
+ } catch (NoSuchMethodException e) {
+ // If the method is not found, we are in trouble. Abort.
+ throw new RuntimeException(e);
+ }
+
+ try {
+ protobufMessageLiteClass.getDeclaredMethod("getParserForType");
+ hasParser = true;
+ } catch (Throwable var2) {
+ }
+
+ HAS_PARSER = hasParser;
+ }
+}