You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2019/02/12 06:58:58 UTC
[asterixdb] 04/04: Merge commit '6312edf' from
'stabilization-f69489' into 'master'
This is an automated email from the ASF dual-hosted git repository.
mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c79e2d5a056381a59bd9e70fc0b5f630e8c001a7
Merge: 47d2f36 6312edf
Author: Michael Blow <mb...@apache.org>
AuthorDate: Mon Feb 11 21:31:22 2019 -0500
Merge commit '6312edf' from 'stabilization-f69489' into 'master'
Change-Id: I6f2daa3ab112d8cb9210ed94db812ac1743bce58
.../apache/asterix/api/http/server/ApiServlet.java | 20 ++-
.../asterix/api/http/server/ClusterApiServlet.java | 2 +-
.../server/ClusterControllerDetailsApiServlet.java | 2 +-
.../api/http/server/ConnectorApiServlet.java | 2 +-
.../api/http/server/DiagnosticsApiServlet.java | 2 +-
.../api/http/server/NetDiagnosticsApiServlet.java | 2 +-
.../server/NodeControllerDetailsApiServlet.java | 2 +-
.../api/http/server/QueryResultApiServlet.java | 13 +-
.../api/http/server/QueryServiceServlet.java | 6 +-
.../api/http/server/QueryStatusApiServlet.java | 10 +-
.../api/http/server/RebalanceApiServlet.java | 2 +-
.../asterix/api/http/server/RestApiServlet.java | 2 +-
.../api/http/server/ShutdownApiServlet.java | 2 +-
.../asterix/api/http/server/StorageApiServlet.java | 2 +-
.../asterix/api/http/server/VersionApiServlet.java | 2 +-
.../api/http/server/QueryWebInterfaceServlet.java | 2 +-
.../org/apache/hyracks/api/util/InvokeUtil.java | 33 +++--
.../cc/web/util/JSONOutputRequestHandler.java | 6 +-
hyracks-fullstack/hyracks/hyracks-http/pom.xml | 12 ++
.../hyracks/http/server/ChunkedResponse.java | 25 +++-
.../apache/hyracks/http/server/FullResponse.java | 17 ++-
.../apache/hyracks/http/server/utils/HttpUtil.java | 97 +++++++++++++-
.../apache/hyracks/http/PipelinedRequestsTest.java | 2 +-
.../hyracks/test/http/HttpAcceptCharsetTest.java | 90 +++++++++++++
.../{http/test => test/http}/HttpRequestTask.java | 2 +-
.../hyracks/test/http/HttpServerEncodingTest.java | 145 +++++++++++++++++++++
.../{http/test => test/http}/HttpServerTest.java | 15 +--
.../hyracks/{ => test}/http/HttpTestUtil.java | 2 +-
.../{ => test}/http/servlet/ChattyServlet.java | 9 +-
.../http/servlet/CompliantEchoServlet.java} | 20 +--
.../{ => test}/http/servlet/EchoServlet.java | 4 +-
.../{ => test}/http/servlet/SleepyServlet.java | 4 +-
.../hyracks/util/string/UTF8StringSample.java | 30 +++--
.../hyracks/util/string/UTF8StringUtilTest.java | 2 -
34 files changed, 471 insertions(+), 117 deletions(-)
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
index badb568,0000000..ffcb74d
mode 100644,000000..100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NetDiagnosticsApiServlet.java
@@@ -1,63 -1,0 +1,63 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCServiceContext;
+import org.apache.hyracks.control.nc.net.NetworkManager;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class NetDiagnosticsApiServlet extends AbstractServlet {
+
+ private final INcApplicationContext appCtx;
+
+ public NetDiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, INcApplicationContext appCtx, String... paths) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws IOException {
- HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
++ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
+ response.setStatus(HttpResponseStatus.OK);
+ final JsonNode netDiagnostics = getNetDiagnostics();
+ final PrintWriter responseWriter = response.writer();
+ JSONUtil.writeNode(responseWriter, netDiagnostics);
+ }
+
+ private JsonNode getNetDiagnostics() {
+ final NCServiceContext serviceContext = (NCServiceContext) appCtx.getServiceContext();
+ final NodeControllerService controllerService = (NodeControllerService) serviceContext.getControllerService();
+ final NetworkManager networkManager = controllerService.getNetworkManager();
+ return networkManager.getMuxDemux().getState();
+ }
+}
diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a34f6b4,625834f..31b784f
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@@ -122,8 -114,20 +122,8 @@@ public class QueryServiceServlet extend
}
@Override
-- protected void post(IServletRequest request, IServletResponse response) {
- try {
- handleRequest(request, response);
- } catch (IOException e) {
- // Servlet methods should not throw exceptions
- // http://cwe.mitre.org/data/definitions/600.html
- GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
- } catch (Throwable th) {// NOSONAR: Logging and re-throwing
- try {
- GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, th.getMessage(), th);
- } catch (Throwable ignored) { // NOSONAR: Logging failure
- }
- throw th;
- }
++ protected void post(IServletRequest request, IServletResponse response) throws IOException {
+ handleRequest(request, response);
}
@Override
@@@ -509,21 -508,39 +509,21 @@@
return "http://" + host + path + handlePath(delivery);
}
- private void handleRequest(IServletRequest request, IServletResponse response) {
+ private void handleRequest(IServletRequest request, IServletResponse response) throws IOException {
- QueryServiceRequestParameters param = getRequestParameters(request);
- LOGGER.info("handleRequest: {}", param);
+ final IRequestReference requestRef = receptionist.welcome(request);
long elapsedStart = System.nanoTime();
- HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
- final PrintWriter httpWriter = response.writer();
-
- ResultDelivery delivery = parseResultDelivery(param.getMode());
-
- final ResultProperties resultProperties = param.getMaxResultReads() == null ? new ResultProperties(delivery)
- : new ResultProperties(delivery, Long.parseLong(param.getMaxResultReads()));
-
- String handleUrl = getHandleUrl(param.getHost(), param.getPath(), delivery);
- SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
- SessionConfig sessionConfig = sessionOutput.config();
-
+ long errorCount = 1;
Stats stats = new Stats();
RequestExecutionState execution = new RequestExecutionState();
-
- // buffer the output until we are ready to set the status of the response message correctly
- sessionOutput.hold();
- sessionOutput.out().print("{\n");
- printRequestId(sessionOutput.out());
- printClientContextID(sessionOutput.out(), param);
- printSignature(sessionOutput.out(), param);
- printType(sessionOutput.out(), sessionConfig);
- long errorCount = 1; // so far we just return 1 error
- List<ExecutionWarning> warnings = Collections.emptyList(); // we don't have any warnings yet
+ List<ExecutionWarning> warnings = Collections.emptyList();
++ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
+ PrintWriter httpWriter = response.writer();
+ SessionOutput sessionOutput = createSessionOutput(httpWriter);
+ QueryServiceRequestParameters param = new QueryServiceRequestParameters();
try {
- if (param.getStatement() == null || param.getStatement().isEmpty()) {
- throw new RuntimeDataException(ErrorCode.NO_STATEMENT_PROVIDED);
- }
- String statementsText = param.getStatement() + ";";
+ // buffer the output until we are ready to set the status of the response message correctly
+ sessionOutput.hold();
+ sessionOutput.out().print("{\n");
- HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
Map<String, String> optionalParams = null;
if (optionalParamProvider != null) {
optionalParams = optionalParamProvider.apply(request);
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 0aadb1e,e72bee9..2b2bdef
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@@ -18,12 -18,15 +18,16 @@@
*/
package org.apache.hyracks.http.server;
+ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+ import java.nio.charset.Charset;
+ import java.nio.charset.StandardCharsets;
import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@@ -64,23 -67,22 +68,22 @@@ public class ChunkedResponse implement
private static final Logger LOGGER = LogManager.getLogger();
private final ChannelHandlerContext ctx;
private final ChunkedNettyOutputStream outputStream;
- private final PrintWriter writer;
+ private final HttpServerHandler<?> handler;
+ private PrintWriter writer;
- private HttpResponse response;
+ private DefaultHttpResponse response;
private boolean headerSent;
private ByteBuf error;
private ChannelFuture future;
private boolean done;
- private final boolean keepAlive;
- public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
+ public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request,
+ int chunkSize) {
+ this.handler = handler;
this.ctx = ctx;
outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
- writer = new PrintWriter(outputStream);
response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
- keepAlive = HttpUtil.isKeepAlive(request);
- response.headers().set(HttpHeaderNames.CONNECTION,
- keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE);
+ HttpUtil.setConnectionHeader(request, response);
}
@Override
@@@ -104,10 -114,14 +115,14 @@@
@Override
public void close() throws IOException {
- writer.close();
+ if (writer != null) {
+ writer.close();
+ } else {
+ outputStream.close();
+ }
if (error == null && response.status() == HttpResponseStatus.OK) {
if (!done) {
- future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ respond(LastHttpContent.EMPTY_LAST_CONTENT);
}
} else {
// There was an error
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 2b1e8b4,85a0a43..f1ff92c
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@@ -21,10 -22,12 +22,13 @@@ import java.io.BufferedWriter
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+ import java.nio.charset.Charset;
+ import java.nio.charset.StandardCharsets;
import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
@@@ -40,28 -45,38 +44,29 @@@ import io.netty.handler.codec.http.Http
public class FullResponse implements IServletResponse {
private final ChannelHandlerContext ctx;
private final ByteArrayOutputStream baos;
- private final PrintWriter writer;
- private final FullHttpResponse response;
- private final boolean keepAlive;
+ private final DefaultFullHttpResponse response;
+ private final HttpServerHandler<?> handler;
+ private PrintWriter writer;
private ChannelFuture future;
- public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+ public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) {
+ this.handler = handler;
this.ctx = ctx;
baos = new ByteArrayOutputStream();
- writer = new PrintWriter(baos);
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
- keepAlive = HttpUtil.isKeepAlive(request);
- if (keepAlive) {
- response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
- }
+ HttpUtil.setConnectionHeader(request, response);
}
@Override
public void close() throws IOException {
- writer.close();
+ if (writer != null) {
+ writer.close();
+ }
FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
- if (keepAlive) {
- if (response.status() == HttpResponseStatus.OK || response.status() == HttpResponseStatus.UNAUTHORIZED) {
- fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
- } else {
- fullResponse.headers().remove(HttpHeaderNames.CONNECTION);
- }
- }
- future = ctx.writeAndFlush(fullResponse);
- if (response.status() != HttpResponseStatus.OK && response.status() != HttpResponseStatus.UNAUTHORIZED) {
- future.addListener(ChannelFutureListener.CLOSE);
- }
+ fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
+ final ChannelPromise responseCompletionPromise = ctx.newPromise();
+ responseCompletionPromise.addListener(handler);
+ future = ctx.writeAndFlush(fullResponse, responseCompletionPromise);
}
@Override
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 1fd8cd2,b34b9dc..1a51318
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@@ -29,17 -35,17 +35,21 @@@ import org.apache.hyracks.http.api.ISer
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.BaseRequest;
import org.apache.hyracks.http.server.FormUrlEncodedRequest;
+ import org.apache.logging.log4j.LogManager;
+ import org.apache.logging.log4j.Logger;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.AsciiString;
public class HttpUtil {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./");
+ private static final String DEFAULT_RESPONSE_CHARSET = StandardCharsets.UTF_8.name();
private HttpUtil() {
}
@@@ -159,9 -169,77 +174,83 @@@
return clusterURL;
}
+ public static void setConnectionHeader(HttpRequest request, DefaultHttpResponse response) {
+ final boolean keepAlive = io.netty.handler.codec.http.HttpUtil.isKeepAlive(request);
+ final AsciiString connectionHeaderValue = keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE;
+ response.headers().set(HttpHeaderNames.CONNECTION, connectionHeaderValue);
+ }
++
+ public static String getPreferredCharset(IServletRequest request) {
+ return getPreferredCharset(request, DEFAULT_RESPONSE_CHARSET);
+ }
+
+ public static String getPreferredCharset(IServletRequest request, String defaultCharset) {
+ String acceptCharset = request.getHeader(HttpHeaderNames.ACCEPT_CHARSET);
+ if (acceptCharset == null) {
+ return defaultCharset;
+ }
+ // If no "q" parameter is present, the default weight is 1 [https://tools.ietf.org/html/rfc7231#section-5.3.1]
+ Optional<String> preferredCharset = Stream.of(StringUtils.split(acceptCharset, ","))
+ .map(WeightedHeaderValue::new).sorted().map(WeightedHeaderValue::getValue)
+ .map(a -> "*".equals(a) ? defaultCharset : a).filter(value -> {
+ if (!Charset.isSupported(value)) {
+ LOGGER.info("disregarding unsupported charset '{}'", value);
+ return false;
+ }
+ return true;
+ }).findFirst();
+ return preferredCharset.orElse(defaultCharset);
+ }
+
+ private static class WeightedHeaderValue implements Comparable<WeightedHeaderValue> {
+
+ final String value;
+ final double weight;
+
+ WeightedHeaderValue(String value) {
+ // Accept-Charset = 1#( ( charset / "*" ) [ weight ] )
+ // weight = OWS ";" OWS "q=" qvalue
+ String[] splits = StringUtils.split(value, ";");
+ this.value = splits[0].trim();
+ if (splits.length == 1) {
+ weight = 1.0d;
+ } else {
+ OptionalDouble specifiedWeight = Stream.of(splits).skip(1).map(String::trim).map(String::toLowerCase)
+ .filter(a -> a.startsWith("q="))
+ .mapToDouble(segment -> Double.parseDouble(StringUtils.splitByWholeSeparator(segment, "q=")[0]))
+ .findFirst();
+ this.weight = specifiedWeight.orElse(1.0d);
+ }
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public double getWeight() {
+ return weight;
+ }
+
+ @Override
+ public int compareTo(WeightedHeaderValue o) {
+ return Double.compare(o.weight, weight);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WeightedHeaderValue that = (WeightedHeaderValue) o;
+ return Double.compare(that.weight, weight) == 0 && Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, weight);
+ }
+ }
}
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
index 0c96eb6,0000000..c0117f1
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
@@@ -1,140 -1,0 +1,140 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
+import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequester;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.HttpServerConfig;
+import org.apache.hyracks.http.server.HttpServerConfigBuilder;
+import org.apache.hyracks.http.server.InterruptOnCloseHandler;
+import org.apache.hyracks.http.server.WebManager;
- import org.apache.hyracks.http.servlet.SleepyServlet;
++import org.apache.hyracks.test.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipelinedRequestsTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int PORT = 9898;
+ private static final String PATH = "/";
+
+ @Test
+ public void pipelinedRequests() throws Exception {
+ setupServer();
+ final HttpHost target = new HttpHost("localhost", PORT);
+ final List<BasicAsyncRequestProducer> requestProducers =
+ Arrays.asList(new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)),
+ new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)));
+ final List<BasicAsyncResponseConsumer> responseConsumers =
+ Arrays.asList(new BasicAsyncResponseConsumer(), new BasicAsyncResponseConsumer());
+ final List<HttpResponse> httpResponses = executePipelined(target, requestProducers, responseConsumers);
+ for (HttpResponse response : httpResponses) {
+ Assert.assertNotEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
+ }
+ }
+
+ private void setupServer() throws Exception {
+ final WebManager webMgr = new WebManager();
+ final HttpServerConfig config =
+ HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+ final HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config, InterruptOnCloseHandler.INSTANCE);
+ final SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ }
+
+ private List<HttpResponse> executePipelined(HttpHost host, List<BasicAsyncRequestProducer> requestProducers,
+ List<BasicAsyncResponseConsumer> responseConsumers) throws Exception {
+ final List<HttpResponse> results = new ArrayList<>();
+ final HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
+ final IOEventDispatch ioEventDispatch =
+ new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
+ final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
+ final BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
+ pool.setDefaultMaxPerRoute(1);
+ pool.setMaxTotal(1);
+ final Thread reactorThread = new Thread(() -> {
+ try {
+ ioReactor.execute(ioEventDispatch);
+ } catch (final IOException e) {
+ LOGGER.error(e);
+ }
+ });
+ reactorThread.start();
+ final HttpCoreContext context = HttpCoreContext.create();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final HttpProcessor httpProc =
+ HttpProcessorBuilder.create().add(new RequestContent()).add(new RequestTargetHost())
+ .add(new RequestConnControl()).add(new RequestExpectContinue(true)).build();
+ final HttpAsyncRequester requester = new HttpAsyncRequester(httpProc);
+ requester.executePipelined(host, requestProducers, responseConsumers, pool, context,
+ new FutureCallback<List<HttpResponse>>() {
+ @Override
+ public void completed(final List<HttpResponse> result) {
+ results.addAll(result);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ latch.countDown();
+ }
+ });
+ latch.await(5, TimeUnit.SECONDS);
+ ioReactor.shutdown();
+ return results;
+ }
+}
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
index c1d1315,ea52b0b..e341e2f
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
@@@ -33,22 -32,13 +33,21 @@@ import java.util.concurrent.Executors
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
- import org.apache.hyracks.http.HttpTestUtil;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.HttpServerConfig;
import org.apache.hyracks.http.server.HttpServerConfigBuilder;
import org.apache.hyracks.http.server.InterruptOnCloseHandler;
import org.apache.hyracks.http.server.WebManager;
- import org.apache.hyracks.http.servlet.ChattyServlet;
- import org.apache.hyracks.http.servlet.EchoServlet;
- import org.apache.hyracks.http.servlet.SleepyServlet;
+ import org.apache.hyracks.test.http.servlet.ChattyServlet;
++import org.apache.hyracks.test.http.servlet.EchoServlet;
+ import org.apache.hyracks.test.http.servlet.SleepyServlet;
import org.apache.hyracks.util.StorageUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@@ -379,42 -369,6 +378,36 @@@ public class HttpServerTest
Assert.assertNotNull(failure);
}
- public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
- Field f = obj.getClass().getDeclaredField(filedName);
- f.setAccessible(true);
- f.set(obj, value);
- }
-
+ @Test
+ public void chunkedRequestTest() throws Exception {
+ final WebManager webMgr = new WebManager();
+ final int serverRequestChunkSize = StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.KILOBYTE);
+ final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16)
+ .setMaxRequestChunkSize(serverRequestChunkSize).build();
+ final HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
+ EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT,
+ HttpServerTest.PATH, null, null);
+ final HttpPost postRequest = new HttpPost(uri);
+ final int requestSize = StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE);
+ final String requestBody = RandomStringUtils.randomAlphanumeric(requestSize);
+ final StringEntity chunkedEntity = new StringEntity(requestBody);
+ chunkedEntity.setChunked(true);
+ postRequest.setEntity(chunkedEntity);
+ try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
+ final String responseBody = EntityUtils.toString(response.getEntity());
+ Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
+ Assert.assertEquals(responseBody, requestBody);
+ }
+ } finally {
+ webMgr.stop();
+ }
+ }
+
private void request(int count) throws URISyntaxException {
request(count, 0);
}
diff --cc hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
index 9298aed,0000000..2da86f3
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/servlet/EchoServlet.java
@@@ -1,46 -1,0 +1,46 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
- package org.apache.hyracks.http.servlet;
++package org.apache.hyracks.test.http.servlet;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * A servlet that echos the received request body
+ */
+public class EchoServlet extends AbstractServlet {
+
+ public EchoServlet(ConcurrentMap<String, Object> ctx, String... paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) throws Exception {
+ final String requestBody = HttpUtil.getRequestBody(request);
+ response.setStatus(HttpResponseStatus.OK);
- HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8);
++ HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, request);
+ response.writer().write(requestBody);
+ }
+}