You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/09/18 19:10:22 UTC

hadoop git commit: HDFS-11873. Ozone: Object store handler supports reusing http client for multiple requests. Contributed by Xiaoyu Yao and Weiwei Yang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 8dbd0354c -> 1caf63789


HDFS-11873. Ozone: Object store handler supports reusing http client for multiple requests. Contributed by Xiaoyu Yao and Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1caf6378
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1caf6378
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1caf6378

Branch: refs/heads/HDFS-7240
Commit: 1caf637897f1d88670778ded10d42da7473fc571
Parents: 8dbd035
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon Sep 18 12:07:43 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon Sep 18 12:07:43 2017 -0700

----------------------------------------------------------------------
 .../web/netty/ObjectStoreJerseyContainer.java   |   1 +
 ...RequestContentObjectStoreChannelHandler.java |  21 +-
 ...equestDispatchObjectStoreChannelHandler.java |   2 +-
 .../ozone/web/client/TestOzoneClient.java       | 305 +++++++++++++++++++
 4 files changed, 327 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1caf6378/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java
index c018663..40d60e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java
@@ -231,6 +231,7 @@ public final class ObjectStoreJerseyContainer {
             ObjectStoreJerseyContainer.this.webapp, this.nettyReq, this.reqIn);
         ObjectStoreJerseyContainer.this.webapp.handleRequest(jerseyReq, this);
       } catch (Exception e) {
+        LOG.error("Error running Jersey Request Runner", e);
         this.exception = e;
         this.latch.countDown();
       } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1caf6378/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java
index db8bf81..501d719 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java
@@ -46,6 +46,7 @@ public final class RequestContentObjectStoreChannelHandler
   private final Future<HttpResponse> nettyResp;
   private final OutputStream reqOut;
   private final InputStream respIn;
+  private ObjectStoreJerseyContainer jerseyContainer;
 
   /**
    * Creates a new RequestContentObjectStoreChannelHandler.
@@ -54,13 +55,16 @@ public final class RequestContentObjectStoreChannelHandler
    * @param nettyResp asynchronous HTTP response
    * @param reqOut output stream for writing request body
    * @param respIn input stream for reading response body
+   * @param jerseyContainer jerseyContainer to handle the request
    */
   public RequestContentObjectStoreChannelHandler(HttpRequest nettyReq,
-      Future<HttpResponse> nettyResp, OutputStream reqOut, InputStream respIn) {
+      Future<HttpResponse> nettyResp, OutputStream reqOut, InputStream respIn,
+      ObjectStoreJerseyContainer jerseyContainer) {
     this.nettyReq = nettyReq;
     this.nettyResp = nettyResp;
     this.reqOut = reqOut;
     this.respIn = respIn;
+    this.jerseyContainer = jerseyContainer;
   }
 
   @Override
@@ -83,6 +87,21 @@ public final class RequestContentObjectStoreChannelHandler
       respFuture.addListener(new CloseableCleanupListener(this.respIn));
       if (!HttpHeaders.isKeepAlive(this.nettyReq)) {
         respFuture.addListener(ChannelFutureListener.CLOSE);
+      } else {
+        respFuture.addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            // Notify client this is the last content for current request.
+            ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+            // Reset the pipeline handler for next request to reuses the
+            // same connection.
+            RequestDispatchObjectStoreChannelHandler h =
+                new RequestDispatchObjectStoreChannelHandler(jerseyContainer);
+            ctx.pipeline().replace(ctx.pipeline().last(),
+                RequestDispatchObjectStoreChannelHandler.class.getSimpleName(),
+                h);
+          }
+        });
       }
     }
     LOG.trace(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1caf6378/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java
index 993c10f..f08ddc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java
@@ -88,7 +88,7 @@ public final class RequestDispatchObjectStoreChannelHandler
     ctx.pipeline().replace(this,
         RequestContentObjectStoreChannelHandler.class.getSimpleName(),
         new RequestContentObjectStoreChannelHandler(nettyReq, nettyResp,
-            reqOut, respIn));
+            reqOut, respIn, jerseyContainer));
 
     LOG.trace("end RequestDispatchObjectStoreChannelHandler channelRead0, " +
         "ctx = {}, nettyReq = {}", ctx, nettyReq);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1caf6378/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java
new file mode 100644
index 0000000..e7444d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpContentDecompressor;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.rest.headers.Header;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.UUID;
+
+import static io.netty.util.CharsetUtil.UTF_8;
+
+/**
+ * Unit tests for Ozone client connection reuse with Apache HttpClient and Netty
+ * based HttpClient.
+ */
+public class TestOzoneClient {
+  private static Logger log = Logger.getLogger(TestOzoneClient.class);
+  private static int testVolumeCount = 5;
+  private static MiniOzoneCluster cluster = null;
+  private static String endpoint = null;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.ALL);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    DataNode dataNode = cluster.getDataNodes().get(0);
+    endpoint = String.format("http://localhost:%d", dataNode.getInfoPort());
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testNewConnectionPerRequest()
+      throws IOException, URISyntaxException {
+    for (int i = 0; i < testVolumeCount; i++) {
+      try (CloseableHttpClient httpClient =
+               HttpClients.createDefault()) {
+        createVolume(getRandomVolumeName(i), httpClient);
+      }
+    }
+  }
+
+  /**
+   * Object handler should be able to serve multiple requests from
+   * a single http client. This allows the client side to reuse
+   * http connections in a connection pool instead of creating a new
+   * connection per request which consumes resource heavily.
+   *
+   */
+  @Test(timeout = 5000)
+  public void testReuseWithApacheHttpClient()
+      throws IOException, URISyntaxException {
+
+    PoolingHttpClientConnectionManager cm =
+        new PoolingHttpClientConnectionManager();
+    cm.setMaxTotal(200);
+    cm.setDefaultMaxPerRoute(20);
+
+    try (CloseableHttpClient httpClient =
+             HttpClients.custom().setConnectionManager(cm).build()) {
+      for (int i = 0; i < testVolumeCount; i++) {
+        createVolume(getRandomVolumeName(i), httpClient);
+      }
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testReuseWithNettyHttpClient()
+      throws IOException, InterruptedException, URISyntaxException {
+    URI uri = new URI(endpoint);
+    String host = uri.getHost() == null? "127.0.0.1" : uri.getHost();
+    int port = uri.getPort();
+
+    EventLoopGroup workerGroup = new NioEventLoopGroup();
+    try {
+      Bootstrap b = new Bootstrap();
+      b.group(workerGroup)
+          .channel(NioSocketChannel.class)
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .option(ChannelOption.SO_REUSEADDR, true)
+          .handler(new ChannelInitializer<SocketChannel>() {
+            /**
+             * This method will be called once the {@link Channel} was
+             * registered. After the method returns this instance
+             * will be removed from the {@link ChannelPipeline}
+             * of the {@link Channel}.
+             *
+             * @param ch the {@link Channel} which was registered.
+             * @throws Exception is thrown if an error occurs.
+             * In that case the {@link Channel} will be closed.
+             */
+            @Override
+            public void initChannel(SocketChannel ch) {
+              ChannelPipeline p = ch.pipeline();
+
+              // Comment the following line if you don't want client http trace
+              p.addLast("log", new LoggingHandler(LogLevel.INFO));
+              p.addLast(new HttpClientCodec());
+              p.addLast(new HttpContentDecompressor());
+              p.addLast(new NettyHttpClientHandler());
+            }
+          });
+
+      Channel ch = b.connect(host, port).sync().channel();
+      for (int i = 0; i < testVolumeCount; i++) {
+        String volumeName = getRandomVolumeName(i);
+        try {
+          sendNettyCreateVolumeRequest(ch, volumeName);
+          Thread.sleep(1000);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+
+      Thread.sleep(1000);
+      ch.close();
+      // Wait for the server to close the connection.
+      ch.closeFuture().sync();
+    } catch (Exception ex) {
+      log.error("Error received in client setup", ex);
+    }finally {
+      workerGroup.shutdownGracefully();
+    }
+  }
+
+  class NettyHttpClientHandler extends
+      SimpleChannelInboundHandler<HttpObject> {
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+      if (msg instanceof HttpResponse) {
+        HttpResponse response = (HttpResponse) msg;
+        log.info("STATUS: " + response.getStatus());
+        log.info("VERSION: " + response.getProtocolVersion());
+        Assert.assertEquals(HttpResponseStatus.CREATED.code(),
+            response.getStatus().code());
+      }
+      if (msg instanceof HttpContent) {
+        HttpContent content = (HttpContent) msg;
+        log.info(content.content().toString(UTF_8));
+        if (content instanceof LastHttpContent) {
+          log.info("END OF CONTENT");
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+      log.error("Exception upon channel read", cause);
+      ctx.close();
+    }
+  }
+
+  private String getRandomVolumeName(int index) {
+    UUID id = UUID.randomUUID();
+    return "test-volume-" + index + "-" + id;
+
+  }
+
+  // Prepare the HTTP request and send it over the netty channel.
+  private void sendNettyCreateVolumeRequest(Channel channel, String volumeName)
+      throws URISyntaxException, IOException {
+    URIBuilder builder = new URIBuilder(endpoint);
+    builder.setPath("/" + volumeName);
+    URI uri = builder.build();
+
+    String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
+    FullHttpRequest request = new DefaultFullHttpRequest(
+        HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath());
+
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    request.headers().set(HttpHeaders.HOST, host);
+    request.headers().add(HttpHeaders.CONTENT_TYPE, "application/json");
+    request.headers().set(Header.OZONE_VERSION_HEADER,
+        Header.OZONE_V1_VERSION_HEADER);
+    request.headers().set(HttpHeaders.DATE,
+        format.format(new Date(Time.monotonicNow())));
+    request.headers().set(Header.OZONE_USER,
+        UserGroupInformation.getCurrentUser().getUserName());
+    request.headers().set(HttpHeaders.AUTHORIZATION,
+        Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " "
+            + OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+    // Send the HTTP request via netty channel.
+    channel.writeAndFlush(request);
+  }
+
+  // It is caller's responsibility to close the client.
+  private void createVolume(String volumeName, CloseableHttpClient httpClient)
+      throws IOException, URISyntaxException {
+    HttpPost create1 =
+        getCreateVolumeRequest(volumeName);
+    HttpEntity entity = null;
+    try {
+      CloseableHttpResponse response1 =
+          httpClient.execute(create1);
+      Assert.assertEquals(HttpURLConnection.HTTP_CREATED,
+          response1.getStatusLine().getStatusCode());
+      entity = response1.getEntity();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      EntityUtils.consumeQuietly(entity);
+    }
+  }
+
+  private HttpPost getCreateVolumeRequest(String volumeName)
+      throws URISyntaxException, IOException {
+    URIBuilder builder = new URIBuilder(endpoint);
+    builder.setPath("/" + volumeName);
+    HttpPost httpPost = new HttpPost(builder.build().toString());
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    httpPost.addHeader(Header.OZONE_VERSION_HEADER,
+        Header.OZONE_V1_VERSION_HEADER);
+    httpPost.addHeader(HttpHeaders.DATE,
+        format.format(new Date(Time.monotonicNow())));
+    httpPost.addHeader(Header.OZONE_USER,
+        UserGroupInformation.getCurrentUser().getUserName());
+    httpPost.addHeader(HttpHeaders.AUTHORIZATION,
+        Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " "
+            + OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+    return httpPost;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org