You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/04/29 05:30:10 UTC

[hbase] branch branch-2.2 updated: HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal (#1301) (#1535) (#1567)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 6a2314d  HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal	 (#1301) (#1535) (#1567)
6a2314d is described below

commit 6a2314dceb2b88a019b0bc09a0e75ba9060a0833
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Tue Apr 28 22:30:00 2020 -0700

    HBASE-23833. The relocated hadoop-thirdparty protobuf breaks HBase asyncwal	 (#1301) (#1535) (#1567)
    
    * Use Reflection to access shaded Hadoop protobuf classes.
    
    (cherry picked from commit a321e536989083ca3620bf2c53f12c07740bf5b0)
    
    * Update to improve the code:
    
    1. Added license.
    2. Added more comments.
    3. Wrap byte array instead of copy to make a ByteString.
    4. Moved all reflection instantiation to static class loading time.
    
    * Use LiteralByteString to wrap byte array instead of copying it.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: stack <st...@apache.org>
    (cherry picked from commit 72727ff9be9bc626522247aad9155938656fbfdb)
    (cherry picked from commit ae3de38bedf7e0a8b418ce50c05956104b4f8621)
    (cherry picked from commit b8b8e0afd494f073f6bdff8471aa53a6764b1257)
---
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |   1 -
 .../FanOutOneBlockAsyncDFSOutputHelper.java        |   1 -
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    |  87 +++++++++++-
 .../hadoop/hbase/io/asyncfs/ProtobufDecoder.java   | 148 +++++++++++++++++++++
 4 files changed, 230 insertions(+), 7 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 7a429c2..ed9da5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -69,7 +69,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
 import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 5eb2004..be53781 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -101,7 +101,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index 271fa16..c4c9bc5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -94,7 +94,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
 import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
@@ -356,15 +355,93 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
     }
 
+    /**
+     * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty.
+     * After Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
+     * Use Reflection to check which ones to use.
+     */
+    private static class BuilderPayloadSetter {
+      private static Method setPayloadMethod;
+      private static Constructor<?> constructor;
+
+      /**
+       * Create a ByteString from byte array without copying (wrap), and then set it as the payload
+       * for the builder.
+       *
+       * @param builder builder for HDFS DataTransferEncryptorMessage.
+       * @param payload byte array of payload.
+       * @throws IOException
+       */
+      static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, byte[] payload)
+        throws IOException {
+        Object byteStringObject;
+        try {
+          // byteStringObject = new LiteralByteString(payload);
+          byteStringObject = constructor.newInstance(payload);
+          // builder.setPayload(byteStringObject);
+          setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
+        } catch (IllegalAccessException | InstantiationException e) {
+          throw new RuntimeException(e);
+
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        }
+      }
+
+      static {
+        Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
+
+        // Try the unrelocated ByteString
+        Class<?> byteStringClass = com.google.protobuf.ByteString.class;
+        try {
+          // See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
+          byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
+          LOG.debug("Found relocated ByteString class from hadoop-thirdparty." +
+            " Assuming this is Hadoop 3.3.0+.");
+        } catch (ClassNotFoundException e) {
+          LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." +
+            " Assuming this is below Hadoop 3.3.0", e);
+        }
+
+        // LiteralByteString is a package private class in protobuf. Make it accessible.
+        Class<?> literalByteStringClass;
+        try {
+          literalByteStringClass = Class.forName(
+            "org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
+          LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
+        } catch (ClassNotFoundException e) {
+          try {
+            literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
+            LOG.debug("com.google.protobuf.LiteralByteString found.");
+          } catch (ClassNotFoundException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+
+        try {
+          constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
+          constructor.setAccessible(true);
+        } catch (NoSuchMethodException e) {
+          throw new RuntimeException(e);
+        }
+
+        try {
+          setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
+        } catch (NoSuchMethodException e) {
+          // if either method is not found, we are in big trouble. Abort.
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
     private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
         List<CipherOption> options) throws IOException {
       DataTransferEncryptorMessageProto.Builder builder =
           DataTransferEncryptorMessageProto.newBuilder();
       builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
       if (payload != null) {
-        // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
-        // and we want to keep that out of hbase-server.
-        builder.setPayload(ByteString.copyFrom(payload));
+        BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
       }
       if (options != null) {
         builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java
new file mode 100644
index 0000000..98b4e6f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/ProtobufDecoder.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufUtil;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Modified based on io.netty.handler.codec.protobuf.ProtobufDecoder.
+ * The Netty's ProtobufDecode supports unshaded protobuf messages (com.google.protobuf).
+ *
+ * Hadoop 3.3.0 and above relocates protobuf classes to a shaded jar (hadoop-thirdparty), and
+ * so we must use reflection to detect which one (relocated or not) to use.
+ *
+ * Do not use this to process HBase's shaded protobuf messages. This is meant to process the
+ * protobuf messages in HDFS for the asyncfs use case.
+ * */
+@InterfaceAudience.Private
+public class ProtobufDecoder extends MessageToMessageDecoder<ByteBuf> {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ProtobufDecoder.class);
+
+  private static Class<?> protobufMessageLiteClass = null;
+  private static Class<?> protobufMessageLiteBuilderClass = null;
+
+  private static final boolean HAS_PARSER;
+
+  private static Method getParserForTypeMethod;
+  private static Method newBuilderForTypeMethod;
+
+  private Method parseFromMethod;
+  private Method mergeFromMethod;
+  private Method buildMethod;
+
+  private Object parser;
+  private Object builder;
+
+
+  public ProtobufDecoder(Object prototype) {
+    try {
+      Method getDefaultInstanceForTypeMethod = protobufMessageLiteClass.getMethod(
+        "getDefaultInstanceForType");
+      Object prototype1 = getDefaultInstanceForTypeMethod
+        .invoke(ObjectUtil.checkNotNull(prototype, "prototype"));
+
+      // parser = prototype.getParserForType()
+      parser = getParserForTypeMethod.invoke(prototype1);
+      parseFromMethod = parser.getClass().getMethod(
+        "parseFrom", byte[].class, int.class, int.class);
+
+      // builder = prototype.newBuilderForType();
+      builder = newBuilderForTypeMethod.invoke(prototype1);
+      mergeFromMethod = builder.getClass().getMethod(
+        "mergeFrom", byte[].class, int.class, int.class);
+
+      // All protobuf message builders inherits from MessageLite.Builder
+      buildMethod = protobufMessageLiteBuilderClass.getDeclaredMethod("build");
+
+    } catch (IllegalAccessException | NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(e.getTargetException());
+    }
+  }
+
+  protected void decode(
+    ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+    int length = msg.readableBytes();
+    byte[] array;
+    int offset;
+    if (msg.hasArray()) {
+      array = msg.array();
+      offset = msg.arrayOffset() + msg.readerIndex();
+    } else {
+      array = ByteBufUtil.getBytes(msg, msg.readerIndex(), length, false);
+      offset = 0;
+    }
+
+    Object addObj;
+    if (HAS_PARSER) {
+      // addObj = parser.parseFrom(array, offset, length);
+      addObj = parseFromMethod.invoke(parser, array, offset, length);
+    } else {
+      // addObj = builder.mergeFrom(array, offset, length).build();
+      Object builderObj = mergeFromMethod.invoke(builder, array, offset, length);
+      addObj = buildMethod.invoke(builderObj);
+    }
+    out.add(addObj);
+  }
+
+  static {
+    boolean hasParser = false;
+
+    // These are the protobuf classes coming from Hadoop. Not the one from hbase-shaded-protobuf
+    protobufMessageLiteClass = com.google.protobuf.MessageLite.class;
+    protobufMessageLiteBuilderClass = com.google.protobuf.MessageLite.Builder.class;
+
+    try {
+      protobufMessageLiteClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.MessageLite");
+      protobufMessageLiteBuilderClass = Class.forName(
+        "org.apache.hadoop.thirdparty.protobuf.MessageLite.Builder");
+      LOG.debug("Hadoop 3.3 and above shades protobuf.");
+    } catch (ClassNotFoundException e) {
+      LOG.debug("Hadoop 3.2 and below use unshaded protobuf.", e);
+    }
+
+    try {
+      getParserForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("getParserForType");
+      newBuilderForTypeMethod = protobufMessageLiteClass.getDeclaredMethod("newBuilderForType");
+    } catch (NoSuchMethodException e) {
+      // If the method is not found, we are in trouble. Abort.
+      throw new RuntimeException(e);
+    }
+
+    try {
+      protobufMessageLiteClass.getDeclaredMethod("getParserForType");
+      hasParser = true;
+    } catch (Throwable var2) {
+    }
+
+    HAS_PARSER = hasParser;
+  }
+}