You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2023/02/01 13:33:25 UTC

[hadoop] branch branch-3.3 updated: HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4836f1ec37a HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252)
4836f1ec37a is described below

commit 4836f1ec37ad9204c5c39c7f9f29a234bed6cd9c
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Jan 30 09:17:04 2023 -0800

    HADOOP-18584. [NFS GW] Fix regression after netty4 migration. (#5252)
    
    Reviewed-by: Tsz-Wo Nicholas Sze <sz...@apache.org>
    (cherry picked from commit 9d47108b50fb0cd79ca48e82077e57572d8873e6)
---
 .../main/java/org/apache/hadoop/oncrpc/RpcUtil.java   | 19 ++++++++++++-------
 .../main/java/org/apache/hadoop/portmap/Portmap.java  |  6 ++----
 .../java/org/apache/hadoop/portmap/TestPortmap.java   | 17 +++++++++++++++++
 3 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
index 784e8c79618..92354f6b86c 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -26,6 +27,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.DatagramPacket;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -172,15 +174,18 @@ public final class RpcUtil {
    */
   @ChannelHandler.Sharable
   private static final class RpcUdpResponseStage extends
-      ChannelInboundHandlerAdapter {
+          SimpleChannelInboundHandler<RpcResponse> {
+    public RpcUdpResponseStage() {
+      // do not auto release the RpcResponse message.
+      super(false);
+    }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      RpcResponse r = (RpcResponse) msg;
-      // TODO: check out https://github.com/netty/netty/issues/1282 for
-      // correct usage
-      ctx.channel().writeAndFlush(r.data());
+    protected void channelRead0(ChannelHandlerContext ctx,
+                                RpcResponse response) throws Exception {
+      ByteBuf buf = Unpooled.wrappedBuffer(response.data());
+      ctx.writeAndFlush(new DatagramPacket(
+              buf, (InetSocketAddress) response.recipient()));
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
index 1a8a305436c..23c7977e30d 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
@@ -117,15 +117,13 @@ final class Portmap {
         .childOption(ChannelOption.SO_REUSEADDR, true)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
-          private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
 
             p.addLast(RpcUtil.constructRpcFrameDecoder(),
-                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, new IdleStateHandler(0, 0,
+                            idleTimeMilliSeconds, TimeUnit.MILLISECONDS), handler,
                 RpcUtil.STAGE_RPC_TCP_RESPONSE);
           }});
 
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
index 8ebf9d03c6c..e2f7c03676c 100644
--- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
@@ -23,8 +23,10 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Arrays;
 import java.util.Map;
 
+import org.apache.hadoop.oncrpc.RpcReply;
 import org.junit.Assert;
 
 import org.apache.hadoop.oncrpc.RpcCall;
@@ -36,6 +38,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestPortmap {
   private static Portmap pm = new Portmap();
   private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
@@ -93,6 +97,19 @@ public class TestPortmap {
         pm.getUdpServerLoAddress());
     try {
       s.send(p);
+
+      // verify that portmap server responds a UDF packet back to the client
+      byte[] receiveData = new byte[65535];
+      DatagramPacket receivePacket = new DatagramPacket(receiveData,
+              receiveData.length);
+      s.setSoTimeout(2000);
+      s.receive(receivePacket);
+
+      // verify that the registration is accepted.
+      XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
+              receivePacket.getLength()));
+      RpcReply reply = RpcReply.read(xdr);
+      assertEquals(reply.getState(), RpcReply.ReplyState.MSG_ACCEPTED);
     } finally {
       s.close();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org