You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2019/02/12 06:58:58 UTC

[asterixdb] 04/04: Merge commit '6312edf' from 'stabilization-f69489' into 'master'

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit c79e2d5a056381a59bd9e70fc0b5f630e8c001a7
Merge: 47d2f36 6312edf
Author: Michael Blow <mb...@apache.org>
AuthorDate: Mon Feb 11 21:31:22 2019 -0500

    Merge commit '6312edf' from 'stabilization-f69489' into 'master'
    
    Change-Id: I6f2daa3ab112d8cb9210ed94db812ac1743bce58

 .../apache/asterix/api/http/server/ApiServlet.java |  20 ++-
 .../asterix/api/http/server/ClusterApiServlet.java |   2 +-
 .../server/ClusterControllerDetailsApiServlet.java |   2 +-
 .../api/http/server/ConnectorApiServlet.java       |   2 +-
 .../api/http/server/DiagnosticsApiServlet.java     |   2 +-
 .../api/http/server/NetDiagnosticsApiServlet.java  |   2 +-
 .../server/NodeControllerDetailsApiServlet.java    |   2 +-
 .../api/http/server/QueryResultApiServlet.java     |  13 +-
 .../api/http/server/QueryServiceServlet.java       |   6 +-
 .../api/http/server/QueryStatusApiServlet.java     |  10 +-
 .../api/http/server/RebalanceApiServlet.java       |   2 +-
 .../asterix/api/http/server/RestApiServlet.java    |   2 +-
 .../api/http/server/ShutdownApiServlet.java        |   2 +-
 .../asterix/api/http/server/StorageApiServlet.java |   2 +-
 .../asterix/api/http/server/VersionApiServlet.java |   2 +-
 .../api/http/server/QueryWebInterfaceServlet.java  |   2 +-
 .../org/apache/hyracks/api/util/InvokeUtil.java    |  33 +++--
 .../cc/web/util/JSONOutputRequestHandler.java      |   6 +-
 hyracks-fullstack/hyracks/hyracks-http/pom.xml     |  12 ++
 .../hyracks/http/server/ChunkedResponse.java       |  25 +++-
 .../apache/hyracks/http/server/FullResponse.java   |  17 ++-
 .../apache/hyracks/http/server/utils/HttpUtil.java |  97 +++++++++++++-
 .../apache/hyracks/http/PipelinedRequestsTest.java |   2 +-
 .../hyracks/test/http/HttpAcceptCharsetTest.java   |  90 +++++++++++++
 .../{http/test => test/http}/HttpRequestTask.java  |   2 +-
 .../hyracks/test/http/HttpServerEncodingTest.java  | 145 +++++++++++++++++++++
 .../{http/test => test/http}/HttpServerTest.java   |  15 +--
 .../hyracks/{ => test}/http/HttpTestUtil.java      |   2 +-
 .../{ => test}/http/servlet/ChattyServlet.java     |   9 +-
 .../http/servlet/CompliantEchoServlet.java}        |  20 +--
 .../{ => test}/http/servlet/EchoServlet.java       |   4 +-
 .../{ => test}/http/servlet/SleepyServlet.java     |   4 +-
 .../hyracks/util/string/UTF8StringSample.java      |  30 +++--
 .../hyracks/util/string/UTF8StringUtilTest.java    |   2 -
 34 files changed, 471 insertions(+), 117 deletions(-)

diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
index badb568,0000000..ffcb74d
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
@@@ -1,63 -1,0 +1,63 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.api.http.server;
 +
 +import java.io.IOException;
 +import java.io.PrintWriter;
 +import java.util.concurrent.ConcurrentMap;
 +
 +import org.apache.asterix.common.api.INcApplicationContext;
 +import org.apache.hyracks.control.nc.NodeControllerService;
 +import org.apache.hyracks.control.nc.application.NCServiceContext;
 +import org.apache.hyracks.control.nc.net.NetworkManager;
 +import org.apache.hyracks.http.api.IServletRequest;
 +import org.apache.hyracks.http.api.IServletResponse;
 +import org.apache.hyracks.http.server.AbstractServlet;
 +import org.apache.hyracks.http.server.utils.HttpUtil;
 +import org.apache.hyracks.util.JSONUtil;
 +
 +import com.fasterxml.jackson.databind.JsonNode;
 +
 +import io.netty.handler.codec.http.HttpResponseStatus;
 +
 +public class NetDiagnosticsApiServlet extends AbstractServlet {
 +
 +    private final INcApplicationContext appCtx;
 +
 +    public NetDiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, INcApplicationContext appCtx, String... paths) {
 +        super(ctx, paths);
 +        this.appCtx = appCtx;
 +    }
 +
 +    @Override
 +    protected void get(IServletRequest request, IServletResponse response) throws IOException {
-         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
++        HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
 +        response.setStatus(HttpResponseStatus.OK);
 +        final JsonNode netDiagnostics = getNetDiagnostics();
 +        final PrintWriter responseWriter = response.writer();
 +        JSONUtil.writeNode(responseWriter, netDiagnostics);
 +    }
 +
 +    private JsonNode getNetDiagnostics() {
 +        final NCServiceContext serviceContext = (NCServiceContext) appCtx.getServiceContext();
 +        final NodeControllerService controllerService = (NodeControllerService) serviceContext.getControllerService();
 +        final NetworkManager networkManager = controllerService.getNetworkManager();
 +        return networkManager.getMuxDemux().getState();
 +    }
 +}
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a34f6b4,625834f..31b784f
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@@ -122,8 -114,20 +122,8 @@@ public class QueryServiceServlet extend
      }
  
      @Override
--    protected void post(IServletRequest request, IServletResponse response) {
 -        try {
 -            handleRequest(request, response);
 -        } catch (IOException e) {
 -            // Servlet methods should not throw exceptions
 -            // http://cwe.mitre.org/data/definitions/600.html
 -            GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
 -        } catch (Throwable th) {// NOSONAR: Logging and re-throwing
 -            try {
 -                GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, th.getMessage(), th);
 -            } catch (Throwable ignored) { // NOSONAR: Logging failure
 -            }
 -            throw th;
 -        }
++    protected void post(IServletRequest request, IServletResponse response) throws IOException {
 +        handleRequest(request, response);
      }
  
      @Override
@@@ -509,21 -508,39 +509,21 @@@
          return "http://" + host + path + handlePath(delivery);
      }
  
-     private void handleRequest(IServletRequest request, IServletResponse response) {
+     private void handleRequest(IServletRequest request, IServletResponse response) throws IOException {
 -        QueryServiceRequestParameters param = getRequestParameters(request);
 -        LOGGER.info("handleRequest: {}", param);
 +        final IRequestReference requestRef = receptionist.welcome(request);
          long elapsedStart = System.nanoTime();
 -        HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
 -        final PrintWriter httpWriter = response.writer();
 -
 -        ResultDelivery delivery = parseResultDelivery(param.getMode());
 -
 -        final ResultProperties resultProperties = param.getMaxResultReads() == null ? new ResultProperties(delivery)
 -                : new ResultProperties(delivery, Long.parseLong(param.getMaxResultReads()));
 -
 -        String handleUrl = getHandleUrl(param.getHost(), param.getPath(), delivery);
 -        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
 -        SessionConfig sessionConfig = sessionOutput.config();
 -
 +        long errorCount = 1;
          Stats stats = new Stats();
          RequestExecutionState execution = new RequestExecutionState();
 -
 -        // buffer the output until we are ready to set the status of the response message correctly
 -        sessionOutput.hold();
 -        sessionOutput.out().print("{\n");
 -        printRequestId(sessionOutput.out());
 -        printClientContextID(sessionOutput.out(), param);
 -        printSignature(sessionOutput.out(), param);
 -        printType(sessionOutput.out(), sessionConfig);
 -        long errorCount = 1; // so far we just return 1 error
 -        List<ExecutionWarning> warnings = Collections.emptyList(); // we don't have any warnings yet
 +        List<ExecutionWarning> warnings = Collections.emptyList();
++        HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
 +        PrintWriter httpWriter = response.writer();
 +        SessionOutput sessionOutput = createSessionOutput(httpWriter);
 +        QueryServiceRequestParameters param = new QueryServiceRequestParameters();
          try {
 -            if (param.getStatement() == null || param.getStatement().isEmpty()) {
 -                throw new RuntimeDataException(ErrorCode.NO_STATEMENT_PROVIDED);
 -            }
 -            String statementsText = param.getStatement() + ";";
 +            // buffer the output until we are ready to set the status of the response message correctly
 +            sessionOutput.hold();
 +            sessionOutput.out().print("{\n");
-             HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
              Map<String, String> optionalParams = null;
              if (optionalParamProvider != null) {
                  optionalParams = optionalParamProvider.apply(request);
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 0aadb1e,e72bee9..2b2bdef
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@@ -18,12 -18,15 +18,16 @@@
   */
  package org.apache.hyracks.http.server;
  
+ import java.io.BufferedWriter;
  import java.io.IOException;
  import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
  import java.io.PrintWriter;
+ import java.nio.charset.Charset;
+ import java.nio.charset.StandardCharsets;
  
  import org.apache.hyracks.http.api.IServletResponse;
 +import org.apache.hyracks.http.server.utils.HttpUtil;
  import org.apache.logging.log4j.Level;
  import org.apache.logging.log4j.LogManager;
  import org.apache.logging.log4j.Logger;
@@@ -64,23 -67,22 +68,22 @@@ public class ChunkedResponse implement
      private static final Logger LOGGER = LogManager.getLogger();
      private final ChannelHandlerContext ctx;
      private final ChunkedNettyOutputStream outputStream;
-     private final PrintWriter writer;
 +    private final HttpServerHandler<?> handler;
+     private PrintWriter writer;
 -    private HttpResponse response;
 +    private DefaultHttpResponse response;
      private boolean headerSent;
      private ByteBuf error;
      private ChannelFuture future;
      private boolean done;
 -    private final boolean keepAlive;
  
 -    public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
 +    public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request,
 +            int chunkSize) {
 +        this.handler = handler;
          this.ctx = ctx;
          outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
-         writer = new PrintWriter(outputStream);
          response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
          response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
 -        keepAlive = HttpUtil.isKeepAlive(request);
 -        response.headers().set(HttpHeaderNames.CONNECTION,
 -                keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE);
 +        HttpUtil.setConnectionHeader(request, response);
      }
  
      @Override
@@@ -104,10 -114,14 +115,14 @@@
  
      @Override
      public void close() throws IOException {
-         writer.close();
+         if (writer != null) {
+             writer.close();
+         } else {
+             outputStream.close();
+         }
          if (error == null && response.status() == HttpResponseStatus.OK) {
              if (!done) {
 -                future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
 +                respond(LastHttpContent.EMPTY_LAST_CONTENT);
              }
          } else {
              // There was an error
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 2b1e8b4,85a0a43..f1ff92c
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@@ -21,10 -22,12 +22,13 @@@ import java.io.BufferedWriter
  import java.io.ByteArrayOutputStream;
  import java.io.IOException;
  import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
  import java.io.PrintWriter;
+ import java.nio.charset.Charset;
+ import java.nio.charset.StandardCharsets;
  
  import org.apache.hyracks.http.api.IServletResponse;
 +import org.apache.hyracks.http.server.utils.HttpUtil;
  
  import io.netty.buffer.Unpooled;
  import io.netty.channel.ChannelFuture;
@@@ -40,28 -45,38 +44,29 @@@ import io.netty.handler.codec.http.Http
  public class FullResponse implements IServletResponse {
      private final ChannelHandlerContext ctx;
      private final ByteArrayOutputStream baos;
-     private final PrintWriter writer;
 -    private final FullHttpResponse response;
 -    private final boolean keepAlive;
 +    private final DefaultFullHttpResponse response;
 +    private final HttpServerHandler<?> handler;
+     private PrintWriter writer;
      private ChannelFuture future;
  
 -    public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
 +    public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) {
 +        this.handler = handler;
          this.ctx = ctx;
          baos = new ByteArrayOutputStream();
-         writer = new PrintWriter(baos);
          response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
 -        keepAlive = HttpUtil.isKeepAlive(request);
 -        if (keepAlive) {
 -            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
 -        }
 +        HttpUtil.setConnectionHeader(request, response);
      }
  
      @Override
      public void close() throws IOException {
-         writer.close();
+         if (writer != null) {
+             writer.close();
+         }
          FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
 -        if (keepAlive) {
 -            if (response.status() == HttpResponseStatus.OK || response.status() == HttpResponseStatus.UNAUTHORIZED) {
 -                fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
 -            } else {
 -                fullResponse.headers().remove(HttpHeaderNames.CONNECTION);
 -            }
 -        }
 -        future = ctx.writeAndFlush(fullResponse);
 -        if (response.status() != HttpResponseStatus.OK && response.status() != HttpResponseStatus.UNAUTHORIZED) {
 -            future.addListener(ChannelFutureListener.CLOSE);
 -        }
 +        fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
 +        final ChannelPromise responseCompletionPromise = ctx.newPromise();
 +        responseCompletionPromise.addListener(handler);
 +        future = ctx.writeAndFlush(fullResponse, responseCompletionPromise);
      }
  
      @Override
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 1fd8cd2,b34b9dc..1a51318
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@@ -29,17 -35,17 +35,21 @@@ import org.apache.hyracks.http.api.ISer
  import org.apache.hyracks.http.api.IServletResponse;
  import org.apache.hyracks.http.server.BaseRequest;
  import org.apache.hyracks.http.server.FormUrlEncodedRequest;
+ import org.apache.logging.log4j.LogManager;
+ import org.apache.logging.log4j.Logger;
  
 +import io.netty.channel.ChannelHandlerContext;
 +import io.netty.handler.codec.http.DefaultHttpResponse;
  import io.netty.handler.codec.http.FullHttpRequest;
  import io.netty.handler.codec.http.HttpHeaderNames;
 +import io.netty.handler.codec.http.HttpHeaderValues;
  import io.netty.handler.codec.http.HttpRequest;
 +import io.netty.util.AsciiString;
  
  public class HttpUtil {
+     private static final Logger LOGGER = LogManager.getLogger();
      private static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./");
+     private static final String DEFAULT_RESPONSE_CHARSET = StandardCharsets.UTF_8.name();
  
      private HttpUtil() {
      }
@@@ -159,9 -169,77 +174,83 @@@
          return clusterURL;
      }
  
 +    public static void setConnectionHeader(HttpRequest request, DefaultHttpResponse response) {
 +        final boolean keepAlive = io.netty.handler.codec.http.HttpUtil.isKeepAlive(request);
 +        final AsciiString connectionHeaderValue = keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE;
 +        response.headers().set(HttpHeaderNames.CONNECTION, connectionHeaderValue);
 +    }
++
+     public static String getPreferredCharset(IServletRequest request) {
+         return getPreferredCharset(request, DEFAULT_RESPONSE_CHARSET);
+     }
+ 
+     public static String getPreferredCharset(IServletRequest request, String defaultCharset) {
+         String acceptCharset = request.getHeader(HttpHeaderNames.ACCEPT_CHARSET);
+         if (acceptCharset == null) {
+             return defaultCharset;
+         }
+         // If no "q" parameter is present, the default weight is 1 [https://tools.ietf.org/html/rfc7231#section-5.3.1]
+         Optional<String> preferredCharset = Stream.of(StringUtils.split(acceptCharset, ","))
+                 .map(WeightedHeaderValue::new).sorted().map(WeightedHeaderValue::getValue)
+                 .map(a -> "*".equals(a) ? defaultCharset : a).filter(value -> {
+                     if (!Charset.isSupported(value)) {
+                         LOGGER.info("disregarding unsupported charset '{}'", value);
+                         return false;
+                     }
+                     return true;
+                 }).findFirst();
+         return preferredCharset.orElse(defaultCharset);
+     }
+ 
+     private static class WeightedHeaderValue implements Comparable<WeightedHeaderValue> {
+ 
+         final String value;
+         final double weight;
+ 
+         WeightedHeaderValue(String value) {
+             // Accept-Charset = 1#( ( charset / "*" ) [ weight ] )
+             // weight = OWS ";" OWS "q=" qvalue
+             String[] splits = StringUtils.split(value, ";");
+             this.value = splits[0].trim();
+             if (splits.length == 1) {
+                 weight = 1.0d;
+             } else {
+                 OptionalDouble specifiedWeight = Stream.of(splits).skip(1).map(String::trim).map(String::toLowerCase)
+                         .filter(a -> a.startsWith("q="))
+                         .mapToDouble(segment -> Double.parseDouble(StringUtils.splitByWholeSeparator(segment, "q=")[0]))
+                         .findFirst();
+                 this.weight = specifiedWeight.orElse(1.0d);
+             }
+         }
+ 
+         public String getValue() {
+             return value;
+         }
+ 
+         public double getWeight() {
+             return weight;
+         }
+ 
+         @Override
+         public int compareTo(WeightedHeaderValue o) {
+             return Double.compare(o.weight, weight);
+         }
+ 
+         @Override
+         public boolean equals(Object o) {
+             if (this == o) {
+                 return true;
+             }
+             if (o == null || getClass() != o.getClass()) {
+                 return false;
+             }
+             WeightedHeaderValue that = (WeightedHeaderValue) o;
+             return Double.compare(that.weight, weight) == 0 && Objects.equals(value, that.value);
+         }
+ 
+         @Override
+         public int hashCode() {
+             return Objects.hash(value, weight);
+         }
+     }
  }
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
index 0c96eb6,0000000..c0117f1
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
@@@ -1,140 -1,0 +1,140 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.hyracks.http;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.http.HttpHost;
 +import org.apache.http.HttpResponse;
 +import org.apache.http.HttpStatus;
 +import org.apache.http.concurrent.FutureCallback;
 +import org.apache.http.config.ConnectionConfig;
 +import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
 +import org.apache.http.impl.nio.pool.BasicNIOConnPool;
 +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 +import org.apache.http.message.BasicHttpRequest;
 +import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
 +import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
 +import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
 +import org.apache.http.nio.protocol.HttpAsyncRequester;
 +import org.apache.http.nio.reactor.ConnectingIOReactor;
 +import org.apache.http.nio.reactor.IOEventDispatch;
 +import org.apache.http.protocol.HttpCoreContext;
 +import org.apache.http.protocol.HttpProcessor;
 +import org.apache.http.protocol.HttpProcessorBuilder;
 +import org.apache.http.protocol.RequestConnControl;
 +import org.apache.http.protocol.RequestContent;
 +import org.apache.http.protocol.RequestExpectContinue;
 +import org.apache.http.protocol.RequestTargetHost;
 +import org.apache.hyracks.http.server.HttpServer;
 +import org.apache.hyracks.http.server.HttpServerConfig;
 +import org.apache.hyracks.http.server.HttpServerConfigBuilder;
 +import org.apache.hyracks.http.server.InterruptOnCloseHandler;
 +import org.apache.hyracks.http.server.WebManager;
- import org.apache.hyracks.http.servlet.SleepyServlet;
++import org.apache.hyracks.test.http.servlet.SleepyServlet;
 +import org.apache.logging.log4j.LogManager;
 +import org.apache.logging.log4j.Logger;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class PipelinedRequestsTest {
 +
 +    private static final Logger LOGGER = LogManager.getLogger();
 +    private static final int PORT = 9898;
 +    private static final String PATH = "/";
 +
 +    @Test
 +    public void pipelinedRequests() throws Exception {
 +        setupServer();
 +        final HttpHost target = new HttpHost("localhost", PORT);
 +        final List<BasicAsyncRequestProducer> requestProducers =
 +                Arrays.asList(new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)),
 +                        new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)));
 +        final List<BasicAsyncResponseConsumer> responseConsumers =
 +                Arrays.asList(new BasicAsyncResponseConsumer(), new BasicAsyncResponseConsumer());
 +        final List<HttpResponse> httpResponses = executePipelined(target, requestProducers, responseConsumers);
 +        for (HttpResponse response : httpResponses) {
 +            Assert.assertNotEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
 +        }
 +    }
 +
 +    private void setupServer() throws Exception {
 +        final WebManager webMgr = new WebManager();
 +        final HttpServerConfig config =
 +                HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
 +        final HttpServer server =
 +                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config, InterruptOnCloseHandler.INSTANCE);
 +        final SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
 +        server.addServlet(servlet);
 +        webMgr.add(server);
 +        webMgr.start();
 +    }
 +
 +    private List<HttpResponse> executePipelined(HttpHost host, List<BasicAsyncRequestProducer> requestProducers,
 +            List<BasicAsyncResponseConsumer> responseConsumers) throws Exception {
 +        final List<HttpResponse> results = new ArrayList<>();
 +        final HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
 +        final IOEventDispatch ioEventDispatch =
 +                new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
 +        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
 +        final BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
 +        pool.setDefaultMaxPerRoute(1);
 +        pool.setMaxTotal(1);
 +        final Thread reactorThread = new Thread(() -> {
 +            try {
 +                ioReactor.execute(ioEventDispatch);
 +            } catch (final IOException e) {
 +                LOGGER.error(e);
 +            }
 +        });
 +        reactorThread.start();
 +        final HttpCoreContext context = HttpCoreContext.create();
 +        final CountDownLatch latch = new CountDownLatch(1);
 +        final HttpProcessor httpProc =
 +                HttpProcessorBuilder.create().add(new RequestContent()).add(new RequestTargetHost())
 +                        .add(new RequestConnControl()).add(new RequestExpectContinue(true)).build();
 +        final HttpAsyncRequester requester = new HttpAsyncRequester(httpProc);
 +        requester.executePipelined(host, requestProducers, responseConsumers, pool, context,
 +                new FutureCallback<List<HttpResponse>>() {
 +                    @Override
 +                    public void completed(final List<HttpResponse> result) {
 +                        results.addAll(result);
 +                        latch.countDown();
 +                    }
 +
 +                    @Override
 +                    public void failed(final Exception ex) {
 +                        latch.countDown();
 +                    }
 +
 +                    @Override
 +                    public void cancelled() {
 +                        latch.countDown();
 +                    }
 +                });
 +        latch.await(5, TimeUnit.SECONDS);
 +        ioReactor.shutdown();
 +        return results;
 +    }
 +}
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
index c1d1315,ea52b0b..e341e2f
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
@@@ -33,22 -32,13 +33,21 @@@ import java.util.concurrent.Executors
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicInteger;
  
 +import org.apache.commons.lang3.RandomStringUtils;
 +import org.apache.http.client.methods.CloseableHttpResponse;
 +import org.apache.http.client.methods.HttpPost;
 +import org.apache.http.entity.StringEntity;
 +import org.apache.http.impl.client.CloseableHttpClient;
 +import org.apache.http.impl.client.HttpClients;
 +import org.apache.http.util.EntityUtils;
- import org.apache.hyracks.http.HttpTestUtil;
  import org.apache.hyracks.http.server.HttpServer;
  import org.apache.hyracks.http.server.HttpServerConfig;
  import org.apache.hyracks.http.server.HttpServerConfigBuilder;
  import org.apache.hyracks.http.server.InterruptOnCloseHandler;
  import org.apache.hyracks.http.server.WebManager;
- import org.apache.hyracks.http.servlet.ChattyServlet;
- import org.apache.hyracks.http.servlet.EchoServlet;
- import org.apache.hyracks.http.servlet.SleepyServlet;
+ import org.apache.hyracks.test.http.servlet.ChattyServlet;
++import org.apache.hyracks.test.http.servlet.EchoServlet;
+ import org.apache.hyracks.test.http.servlet.SleepyServlet;
  import org.apache.hyracks.util.StorageUtil;
  import org.apache.logging.log4j.Level;
  import org.apache.logging.log4j.LogManager;
@@@ -379,42 -369,6 +378,36 @@@ public class HttpServerTest 
          Assert.assertNotNull(failure);
      }
  
-     public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
-         Field f = obj.getClass().getDeclaredField(filedName);
-         f.setAccessible(true);
-         f.set(obj, value);
-     }
- 
 +    @Test
 +    public void chunkedRequestTest() throws Exception {
 +        final WebManager webMgr = new WebManager();
 +        final int serverRequestChunkSize = StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.KILOBYTE);
 +        final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16)
 +                .setMaxRequestChunkSize(serverRequestChunkSize).build();
 +        final HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
 +        EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
 +        server.addServlet(servlet);
 +        webMgr.add(server);
 +        webMgr.start();
 +        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
 +            final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT,
 +                    HttpServerTest.PATH, null, null);
 +            final HttpPost postRequest = new HttpPost(uri);
 +            final int requestSize = StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE);
 +            final String requestBody = RandomStringUtils.randomAlphanumeric(requestSize);
 +            final StringEntity chunkedEntity = new StringEntity(requestBody);
 +            chunkedEntity.setChunked(true);
 +            postRequest.setEntity(chunkedEntity);
 +            try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
 +                final String responseBody = EntityUtils.toString(response.getEntity());
 +                Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
 +                Assert.assertEquals(responseBody, requestBody);
 +            }
 +        } finally {
 +            webMgr.stop();
 +        }
 +    }
 +
      private void request(int count) throws URISyntaxException {
          request(count, 0);
      }
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
index 9298aed,0000000..2da86f3
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
@@@ -1,46 -1,0 +1,46 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
- package org.apache.hyracks.http.servlet;
++package org.apache.hyracks.test.http.servlet;
 +
 +import java.util.concurrent.ConcurrentMap;
 +
 +import org.apache.hyracks.http.api.IServletRequest;
 +import org.apache.hyracks.http.api.IServletResponse;
 +import org.apache.hyracks.http.server.AbstractServlet;
 +import org.apache.hyracks.http.server.utils.HttpUtil;
 +
 +import io.netty.handler.codec.http.HttpResponseStatus;
 +
 +/**
 + * A servlet that echos the received request body
 + */
 +public class EchoServlet extends AbstractServlet {
 +
 +    public EchoServlet(ConcurrentMap<String, Object> ctx, String... paths) {
 +        super(ctx, paths);
 +    }
 +
 +    @Override
 +    protected void post(IServletRequest request, IServletResponse response) throws Exception {
 +        final String requestBody = HttpUtil.getRequestBody(request);
 +        response.setStatus(HttpResponseStatus.OK);
-         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
++        HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, request);
 +        response.writer().write(requestBody);
 +    }
 +}