You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/09/06 16:00:43 UTC

asterixdb git commit: [NO ISSUE][OTH] Improve Http Server

Repository: asterixdb
Updated Branches:
  refs/heads/master 10cc52d92 -> 9b9dc22a2


[NO ISSUE][OTH] Improve Http Server

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Log estimated input memory budget
- Ensure all allocated input memory buffers are 4K. This reduces
  the chance of memory allocation to go beyond budget. This also
  allows input and output to share buffers of this size since
  it is the size of choice for reading and writing.
- Reject requests that go beyond server capacity before reading
  them which reduces wasted resources.
- Allow configurations of number of bosses and workers for an
  Http web manager.
- Push the limit further in Http server tests through:
  -- Increase the size of the single request to ~100KB.
  -- Increase the number of rejected requests.

Change-Id: I7adcd59047805dc384e1c119191eff995c6e9a7a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1991
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9b9dc22a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9b9dc22a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9b9dc22a

Branch: refs/heads/master
Commit: 9b9dc22a25ce5360f70c68c98c4a22e079ad5891
Parents: 10cc52d
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Sep 5 18:09:39 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Sep 6 09:00:19 2017 -0700

----------------------------------------------------------------------
 .../http/server/ChunkedNettyOutputStream.java   |  13 ++-
 .../server/HttpRequestCapacityController.java   | 105 +++++++++++++++++++
 .../apache/hyracks/http/server/HttpServer.java  |  22 +++-
 .../http/server/HttpServerInitializer.java      |   3 +-
 .../apache/hyracks/http/server/WebManager.java  |  32 +++++-
 .../hyracks/http/server/utils/HttpUtil.java     |  60 +++++++++++
 .../hyracks/http/servlet/ChattyServlet.java     |  62 +----------
 .../hyracks/http/test/HttpServerTest.java       |  28 +++--
 8 files changed, 243 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 0066b77..e4f0777 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -88,7 +88,9 @@ public class ChunkedNettyOutputStream extends OutputStream {
                 try {
                     flush();
                 } finally {
-                    buffer.release();
+                    if (buffer != null) {
+                        buffer.release();
+                    }
                 }
             } else {
                 response.fullReponse(buffer);
@@ -101,12 +103,16 @@ public class ChunkedNettyOutputStream extends OutputStream {
     @Override
     public void flush() throws IOException {
         ensureWritable();
-        if (buffer.readableBytes() > 0) {
+        if (buffer != null && buffer.readableBytes() > 0) {
             if (response.status() == HttpResponseStatus.OK) {
                 int size = buffer.capacity();
                 response.beforeFlush();
                 DefaultHttpContent content = new DefaultHttpContent(buffer);
-                ctx.write(content, ctx.channel().voidPromise());
+                ctx.writeAndFlush(content, ctx.channel().voidPromise());
+                // The responisbility of releasing the buffer is now with the netty pipeline since it is forwarded
+                // within the http content. We must nullify buffer before we allocate the next one to avoid
+                // releasing the buffer twice in case the allocation call fails.
+                buffer = null;
                 buffer = ctx.alloc().buffer(size);
             } else {
                 ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes());
@@ -120,7 +126,6 @@ public class ChunkedNettyOutputStream extends OutputStream {
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
-                ctx.flush();
                 if (!ctx.channel().isOpen()) {
                     throw new IOException("Closed channel");
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
new file mode 100644
index 0000000..e1f9e5a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.PromiseNotificationUtil;
+
+/**
+ * A handler to do input control... as a single pipeline can create many requests
+ * The remaining capacity of the server executor queue is incremented only when a request has been fully read
+ * Therefore, there is a window where requests can be read and get rejected later when they are
+ * submitted to the server executor
+ */
+public class HttpRequestCapacityController extends ChannelInboundHandlerAdapter {
+
+    private static final Logger LOGGER = Logger.getLogger(HttpRequestCapacityController.class.getName());
+    private final HttpServer server;
+    private boolean overloaded = false;
+
+    public HttpRequestCapacityController(HttpServer server) {
+        this.server = server;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (overloaded) {
+            ReferenceCountUtil.release(msg);
+            return;
+        }
+        if (overloaded()) {
+            ReferenceCountUtil.release(msg);
+            reject(ctx);
+            return;
+        } else {
+            super.channelRead(ctx, msg);
+        }
+    }
+
+    public static void reject(ChannelHandlerContext ctx) {
+        HttpResponseEncoder encoder = new HttpResponseEncoder();
+        ChannelPromise promise = ctx.newPromise();
+        promise.addListener(ChannelFutureListener.CLOSE);
+        DefaultFullHttpResponse response =
+                new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE);
+        try {
+            encoder.write(ctx, response, ctx.voidPromise());
+            ctx.writeAndFlush(ctx.alloc().buffer(0), promise);
+        } catch (Throwable th) {//NOSONAR
+            try {
+                LOGGER.log(Level.SEVERE, "Failure during request rejection", th);
+            } catch (Throwable loggingFailure) {//NOSONAR
+            }
+            PromiseNotificationUtil.tryFailure(promise, th, null);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        if (overloaded()) {
+            reject(ctx);
+            return;
+        }
+        // We disable auto read to avoid reading at all if we can't handle any more requests
+        ctx.read();
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        ctx.read();
+        super.channelReadComplete(ctx);
+    }
+
+    private boolean overloaded() {
+        overloaded = overloaded || server.getExecutor().getQueue().remainingCapacity() == 0;
+        return overloaded;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 56f454f..44d4dfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +37,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.FullHttpRequest;
@@ -50,6 +50,9 @@ public class HttpServer {
     private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
     protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
             new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK);
+    protected static final int RECEIVE_BUFFER_SIZE = 4096;
+    protected static final int DEFAULT_NUM_EXECUTOR_THREADS = 16;
+    protected static final int DEFAULT_REQUEST_QUEUE_SIZE = 256;
     private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -65,14 +68,14 @@ public class HttpServer {
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final int port;
-    private final ExecutorService executor;
+    private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
     private Channel channel;
     private Throwable cause;
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
-        this(bossGroup, workerGroup, port, 16, 256);
+        this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE);
     }
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
@@ -87,7 +90,14 @@ public class HttpServer {
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE;
-        LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes");
+        LOGGER.log(Level.INFO, "The output direct memory budget for this server is " + directMemoryBudget + " bytes");
+        long inputBudgetEstimate =
+                (long) HttpServerInitializer.MAX_REQUEST_INITIAL_LINE_LENGTH * (requestQueueSize + numExecutorThreads);
+        inputBudgetEstimate = inputBudgetEstimate * 2;
+        LOGGER.log(Level.INFO,
+                "The \"estimated\" input direct memory budget for this server is " + inputBudgetEstimate + " bytes");
+        // Having multiple arenas, memory fragments, and local thread cached buffers
+        // can cause the input memory usage to exceed estimate and custom buffer allocator must be used to avoid this
     }
 
     public final void start() throws Exception { // NOSONAR
@@ -199,6 +209,8 @@ public class HttpServer {
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
+                .childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
@@ -264,7 +276,7 @@ public class HttpServer {
         return new HttpServerHandler<>(this, chunkSize);
     }
 
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return executor;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index a32da39..bc173fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -31,7 +31,7 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
     public static final int MAX_REQUEST_HEADER_SIZE = 262144;
     public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
     public static final int RESPONSE_CHUNK_SIZE = 4096;
-    private HttpServer server;
+    private final HttpServer server;
 
     public HttpServerInitializer(HttpServer server) {
         this.server = server;
@@ -40,6 +40,7 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
     @Override
     public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
+        p.addLast(new HttpRequestCapacityController(server));
         p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, MAX_REQUEST_HEADER_SIZE,
                 MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
index 4a09f78..55741e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
@@ -30,10 +30,38 @@ public class WebManager {
     private final EventLoopGroup bosses;
     private final EventLoopGroup workers;
 
+    /**
+     * Create a web manager with number of bosses = 1
+     * and number of workers = MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
+     * The default can be set using -Dio.netty.eventLoopThreads
+     * Otherwise, it is set to Runtime.getRuntime().availableProcessors() * 2
+     */
     public WebManager() {
+        this(1, 0);
+    }
+
+    /**
+     * Create a web manager with number of bosses = 1 and number of workers = numWorkers
+     *
+     * @param numWorkers
+     *            number of worker threads
+     */
+    public WebManager(int numWorkers) {
+        this(1, numWorkers);
+    }
+
+    /**
+     * Create a web manager with number of bosses = numBosses and number of workers = numWorkers
+     *
+     * @param numBosses
+     *            number of boss threads
+     * @param numWorkers
+     *            number of worker threads
+     */
+    public WebManager(int numBosses, int numWorkers) {
         servers = new ArrayList<>();
-        bosses = new NioEventLoopGroup(1);
-        workers = new NioEventLoopGroup();
+        bosses = new NioEventLoopGroup(numBosses);
+        workers = new NioEventLoopGroup(numWorkers);
     }
 
     public List<HttpServer> getServers() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index fa3cc57..8240bce 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,9 +19,16 @@
 package org.apache.hyracks.http.server.utils;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.regex.Pattern;
 
 import org.apache.hyracks.http.api.IServletRequest;
@@ -29,13 +36,17 @@ import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.BaseRequest;
 import org.apache.hyracks.http.server.FormUrlEncodedRequest;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.internal.PlatformDependent;
 
 public class HttpUtil {
 
+    private static final Logger LOGGER = Logger.getLogger(HttpUtil.class.getName());
     private static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./");
+    private static long maxMemUsage = 0L;
 
     private HttpUtil() {
     }
@@ -145,4 +156,53 @@ public class HttpUtil {
         return clusterURL;
     }
 
+    @SuppressWarnings("restriction")
+    public static synchronized void printMemUsage() {
+        StringBuilder report = new StringBuilder();
+        report.append("sun.misc.VM.maxDirectMemory: ");
+        report.append(sun.misc.VM.maxDirectMemory());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
+        report.append('\n');
+        report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
+        report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
+        report.append('\n');
+        report.append("---------------------------- Beans ----------------------------");
+        report.append('\n');
+        List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
+        for (MemoryPoolMXBean bean : memPoolBeans) {
+            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
+                report.append(bean.getName());
+                report.append(": ");
+                report.append(bean.getUsage());
+                report.append('\n');
+            }
+        }
+        report.append("---------------------------- Netty ----------------------------");
+        report.append('\n');
+        try {
+            Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+            field.setAccessible(true);
+            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
+            long used = usedDirectMemory.get();
+            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            report.append(used);
+            report.append('\n');
+            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            maxMemUsage = Math.max(maxMemUsage, used);
+            report.append(maxMemUsage);
+            report.append('\n');
+            report.append('\n');
+        } catch (Throwable th) { // NOSONAR
+            LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
+            return;
+        }
+        report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
+        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
+        LOGGER.log(Level.INFO, report.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index bf0452b..5bd2e38 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -18,13 +18,7 @@
  */
 package org.apache.hyracks.http.servlet;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.management.MemoryType;
-import java.lang.reflect.Field;
-import java.util.List;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,13 +27,10 @@ import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.util.internal.PlatformDependent;
 
 public class ChattyServlet extends AbstractServlet {
     private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
-    private static long MAX = 0L;
     private byte[] bytes;
 
     public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
@@ -66,57 +57,6 @@ public class ChattyServlet extends AbstractServlet {
         for (int i = 0; i < 100; i++) {
             response.outputStream().write(bytes);
         }
-        printMemUsage();
-    }
-
-    @SuppressWarnings("restriction")
-    public synchronized static void printMemUsage() {
-        StringBuilder report = new StringBuilder();
-        report.append("sun.misc.VM.maxDirectMemory: ");
-        report.append(sun.misc.VM.maxDirectMemory());
-        report.append('\n');
-        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
-        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
-        report.append('\n');
-        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
-        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
-        report.append('\n');
-        report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
-        report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
-        report.append('\n');
-        report.append("---------------------------- Beans ----------------------------");
-        report.append('\n');
-        List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
-        for (MemoryPoolMXBean bean : memPoolBeans) {
-            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
-                report.append(bean.getName());
-                report.append(": ");
-                report.append(bean.getUsage());
-                report.append('\n');
-            }
-        }
-        report.append("---------------------------- Netty ----------------------------");
-        report.append('\n');
-        try {
-            Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
-            field.setAccessible(true);
-            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
-            long used = usedDirectMemory.get();
-            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
-            report.append(used);
-            report.append('\n');
-            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
-            MAX = Math.max(MAX, used);
-            report.append(MAX);
-            report.append('\n');
-            report.append('\n');
-        } catch (Throwable th) {
-            th.printStackTrace();
-            LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
-            return;
-        }
-        report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
-        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
-        LOGGER.log(Level.INFO, report.toString());
+        HttpUtil.printMemUsage();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b9dc22a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 66d1b77..a3048ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -45,6 +45,7 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SleepyServlet;
 import org.junit.Assert;
@@ -62,6 +63,7 @@ public class HttpServerTest {
     static final AtomicInteger SUCCESS_COUNT = new AtomicInteger();
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
+    static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
     static final List<Future<Void>> FUTURES = new ArrayList<>();
     static final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -70,6 +72,7 @@ public class HttpServerTest {
         SUCCESS_COUNT.set(0);
         UNAVAILABLE_COUNT.set(0);
         OTHER_COUNT.set(0);
+        EXCEPTION_COUNT.set(0);
     }
 
     @Test
@@ -77,7 +80,7 @@ public class HttpServerTest {
         WebManager webMgr = new WebManager();
         int numExecutors = 16;
         int serverQueueSize = 16;
-        int numRequests = 48;
+        int numRequests = 128;
         HttpServer server =
                 new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
         SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
@@ -100,7 +103,9 @@ public class HttpServerTest {
                 f.get();
             }
             Assert.assertEquals(expectedSuccess, SUCCESS_COUNT.get());
-            Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get());
+            Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get() + EXCEPTION_COUNT.get());
+            System.err.println("Number of rejections: " + UNAVAILABLE_COUNT.get());
+            System.err.println("Number of exceptions: " + EXCEPTION_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
         } catch (Throwable th) {
             th.printStackTrace();
@@ -111,7 +116,7 @@ public class HttpServerTest {
     }
 
     private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception {
-        int maxAttempts = 5;
+        int maxAttempts = 15;
         int attempt = 0;
         int queued = server.getWorkQueueSize();
         while (queued != expectedQueued) {
@@ -144,7 +149,7 @@ public class HttpServerTest {
         try {
             try {
                 for (int i = 0; i < numPatches; i++) {
-                    ChattyServlet.printMemUsage();
+                    HttpUtil.printMemUsage();
                     request(numRequests);
                     for (Future<Void> f : FUTURES) {
                         f.get();
@@ -152,14 +157,17 @@ public class HttpServerTest {
                     FUTURES.clear();
                 }
             } finally {
-                ChattyServlet.printMemUsage();
+                HttpUtil.printMemUsage();
                 servlet.wakeUp();
                 for (Future<Void> f : stuck) {
                     f.get();
                 }
             }
         } finally {
+            System.err.println("Number of rejections: " + UNAVAILABLE_COUNT.get());
+            System.err.println("Number of exceptions: " + EXCEPTION_COUNT.get());
             webMgr.stop();
+            HttpUtil.printMemUsage();
         }
     }
 
@@ -174,7 +182,7 @@ public class HttpServerTest {
         int numRequests = 64;
         int numExecutors = 32;
         int serverQueueSize = 32;
-        ChattyServlet.printMemUsage();
+        HttpUtil.printMemUsage();
         WebManager webMgr = new WebManager();
         HttpServer server =
                 new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
@@ -191,7 +199,9 @@ public class HttpServerTest {
             Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
         } finally {
+            HttpUtil.printMemUsage();
             webMgr.stop();
+            HttpUtil.printMemUsage();
         }
     }
 
@@ -261,8 +271,8 @@ public class HttpServerTest {
                     }
                     IOUtils.closeQuietly(in);
                 } catch (Throwable th) {
-                    th.printStackTrace();
-                    throw th;
+                    // Server closed connection before we complete writing..
+                    EXCEPTION_COUNT.incrementAndGet();
                 }
                 return null;
             });
@@ -291,7 +301,7 @@ public class HttpServerTest {
         URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
         RequestBuilder builder = RequestBuilder.post(uri);
         StringBuilder str = new StringBuilder();
-        for (int i = 0; i < 32; i++) {
+        for (int i = 0; i < 2046; i++) {
             str.append("This is a string statement that will be ignored");
             str.append('\n');
         }