You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/02/20 06:05:54 UTC

asterixdb git commit: [NO ISSUE][OTH] Enable adding request channel close listener

Repository: asterixdb
Updated Branches:
  refs/heads/master 15589b42e -> 02825e092


[NO ISSUE][OTH] Enable adding request channel close listener

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Introduce IChannelCloseHandler.handle that gets called when
    the request channel is closed.
  - Add HttpServer.getChannelCloseHandler
  - Add IServlet.getChannelCloseHandler

details:
- Previously, we didn't know that an Http client closed the
  connection until we try to write and find that the channel has
  been closed.
- After this change, the moment the channel is closed, the http
  channel close handler is called.
- A test is added with a handler that interrupts the execution.

Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1972
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/02825e09
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/02825e09
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/02825e09

Branch: refs/heads/master
Commit: 02825e092e1dcd9f6c4b12a15087426374e6d336
Parents: 15589b4
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Feb 19 17:09:02 2018 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Mon Feb 19 22:05:38 2018 -0800

----------------------------------------------------------------------
 .../hyracks/http/api/IChannelClosedHandler.java |  39 ++++++
 .../org/apache/hyracks/http/api/IServlet.java   |  13 ++
 .../apache/hyracks/http/server/HttpServer.java  |  19 ++-
 .../hyracks/http/server/HttpServerHandler.java  |  12 +-
 .../hyracks/http/servlet/SleepyServlet.java     |  17 ++-
 .../hyracks/http/test/HttpRequestTask.java      | 109 +++++++++++++++++
 .../hyracks/http/test/HttpServerTest.java       | 119 +++++++------------
 7 files changed, 249 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java
new file mode 100644
index 0000000..4e433ad
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.api;
+
+import java.util.concurrent.Future;
+
+import org.apache.hyracks.http.server.HttpServer;
+
+@FunctionalInterface
+public interface IChannelClosedHandler {
+
+    /**
+     * Handle a request channel closed event
+     *
+     * @param server
+     *            the server handling the request
+     * @param servlet
+     *            the servlet handling the request
+     * @param task
+     *            the task handling the request
+     */
+    void channelClosed(HttpServer server, IServlet servlet, Future<Void> task);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
index 157eef5..186fb0e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.http.api;
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hyracks.http.server.HttpServer;
+
 /**
  * Represents a component that handles IServlet requests
  */
@@ -42,4 +44,15 @@ public interface IServlet {
      * @param response
      */
     void handle(IServletRequest request, IServletResponse response);
+
+    /**
+     * Get the handler for channel close events
+     *
+     * @param server
+     *            the http server
+     * @return the handler for channel close events
+     */
+    default IChannelClosedHandler getChannelClosedHandler(HttpServer server) {
+        return server.getChannelClosedHandler();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 8ce1d70..d971a7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hyracks.http.api.IChannelClosedHandler;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.logging.log4j.Level;
@@ -62,6 +63,7 @@ public class HttpServer {
     private static final int STARTED = 2;
     private static final int STOPPING = 3;
     // Final members
+    private final IChannelClosedHandler closedHandler;
     private final Object lock = new Object();
     private final AtomicInteger threadId = new AtomicInteger();
     private final ConcurrentMap<String, Object> ctx;
@@ -78,14 +80,25 @@ public class HttpServer {
     private Throwable cause;
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
-        this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE);
+        this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, null);
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port,
+            IChannelClosedHandler closeHandler) {
+        this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, closeHandler);
     }
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
             int requestQueueSize) {
+        this(bossGroup, workerGroup, port, numExecutorThreads, requestQueueSize, null);
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
+            int requestQueueSize, IChannelClosedHandler closeHandler) {
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
         this.port = port;
+        this.closedHandler = closeHandler;
         ctx = new ConcurrentHashMap<>();
         servlets = new ArrayList<>();
         workQueue = new LinkedBlockingQueue<>(requestQueueSize);
@@ -378,6 +391,10 @@ public class HttpServer {
         return workQueue.size();
     }
 
+    public IChannelClosedHandler getChannelClosedHandler() {
+        return closedHandler;
+    }
+
     @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\"" + getState()

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 9290cdf..2787b30 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,8 +19,10 @@
 package org.apache.hyracks.http.server;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 
+import org.apache.hyracks.http.api.IChannelClosedHandler;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -92,12 +94,16 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
-        submit();
+        submit(ctx, servlet);
     }
 
-    private void submit() throws IOException {
+    private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException {
         try {
-            server.getExecutor(handler).submit(handler);
+            Future<Void> task = server.getExecutor(handler).submit(handler);
+            final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server);
+            if (closeHandler != null) {
+                ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task));
+            }
         } catch (RejectedExecutionException e) { // NOSONAR
             LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage());
             handler.reject();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 6bfa0cf..2a5a0a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -31,6 +31,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 public class SleepyServlet extends AbstractServlet {
 
     private volatile boolean sleep = true;
+    private int numSlept = 0;
 
     public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
@@ -46,8 +47,11 @@ public class SleepyServlet extends AbstractServlet {
         response.setStatus(HttpResponseStatus.OK);
         if (sleep) {
             synchronized (this) {
-                while (sleep) {
-                    this.wait();
+                if (sleep) {
+                    incrementSleptCount();
+                    while (sleep) {
+                        this.wait();
+                    }
                 }
             }
         }
@@ -55,6 +59,15 @@ public class SleepyServlet extends AbstractServlet {
         response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
     }
 
+    private void incrementSleptCount() {
+        numSlept++;
+        notifyAll();
+    }
+
+    public int getNumSlept() {
+        return numSlept;
+    }
+
     public synchronized void wakeUp() {
         sleep = false;
         notifyAll();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
new file mode 100644
index 0000000..17f6f9a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpRequestTask implements Callable<Void> {
+
+    protected final HttpUriRequest request;
+
+    protected HttpRequestTask() throws URISyntaxException {
+        request = post(null);
+    }
+
+    @Override
+    public Void call() throws Exception {
+        try {
+            HttpResponse response = executeHttpRequest(request);
+            if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
+                HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+            } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE.code()) {
+                HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+            } else {
+                HttpServerTest.OTHER_COUNT.incrementAndGet();
+            }
+            InputStream in = response.getEntity().getContent();
+            if (HttpServerTest.PRINT_TO_CONSOLE) {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+                String line = null;
+                while ((line = reader.readLine()) != null) {
+                    System.out.println(line);
+                }
+            }
+            IOUtils.closeQuietly(in);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        return null;
+    }
+
+    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+        HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+        try {
+            return client.execute(method);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    protected HttpUriRequest get(String query) throws URISyntaxException {
+        URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH,
+                query, null);
+        RequestBuilder builder = RequestBuilder.get(uri);
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+
+    protected HttpUriRequest post(String query) throws URISyntaxException {
+        URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH,
+                query, null);
+        RequestBuilder builder = RequestBuilder.post(uri);
+        StringBuilder str = new StringBuilder();
+        for (int i = 0; i < 32; i++) {
+            str.append("This is a string statement that will be ignored");
+            str.append('\n');
+        }
+        String statement = str.toString();
+        builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+        builder.addParameter("statement", statement);
+        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/02825e09/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 298d2de..7e6ccf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -19,15 +19,12 @@
 package org.apache.hyracks.http.test;
 
 import java.io.BufferedReader;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.Socket;
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -35,14 +32,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -69,6 +58,7 @@ public class HttpServerTest {
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
     static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
+    static final List<HttpRequestTask> TASKS = new ArrayList<>();
     static final List<Future<Void>> FUTURES = new ArrayList<>();
     static final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -78,6 +68,8 @@ public class HttpServerTest {
         UNAVAILABLE_COUNT.set(0);
         OTHER_COUNT.set(0);
         EXCEPTION_COUNT.set(0);
+        FUTURES.clear();
+        TASKS.clear();
     }
 
     @Test
@@ -303,76 +295,57 @@ public class HttpServerTest {
         }
     }
 
+    @Test
+    public void testInterruptOnClientClose() throws Exception {
+        WebManager webMgr = new WebManager();
+        int numExecutors = 1;
+        int queueSize = 1;
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, queueSize,
+                (reqServer, reqServlet, reqTask) -> reqTask.cancel(true));
+        SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            request(1);
+            synchronized (servlet) {
+                while (servlet.getNumSlept() == 0) {
+                    servlet.wait();
+                }
+            }
+            request(1);
+            waitTillQueued(server, 1);
+            FUTURES.remove(0);
+            HttpRequestTask request = TASKS.remove(0);
+            request.request.abort();
+            waitTillQueued(server, 0);
+            synchronized (servlet) {
+                while (servlet.getNumSlept() == 1) {
+                    servlet.wait();
+                }
+            }
+            servlet.wakeUp();
+            for (Future<Void> f : FUTURES) {
+                f.get();
+            }
+            FUTURES.clear();
+        } finally {
+            webMgr.stop();
+        }
+    }
+
     public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
         Field f = obj.getClass().getDeclaredField(filedName);
         f.setAccessible(true);
         f.set(obj, value);
     }
 
-    private void request(int count) {
+    private void request(int count) throws URISyntaxException {
         for (int i = 0; i < count; i++) {
-            Future<Void> next = executor.submit(() -> {
-                try {
-                    HttpUriRequest request = post(null);
-                    HttpResponse response = executeHttpRequest(request);
-                    if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
-                        SUCCESS_COUNT.incrementAndGet();
-                    } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE
-                            .code()) {
-                        UNAVAILABLE_COUNT.incrementAndGet();
-                    } else {
-                        OTHER_COUNT.incrementAndGet();
-                    }
-                    InputStream in = response.getEntity().getContent();
-                    if (PRINT_TO_CONSOLE) {
-                        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-                        String line = null;
-                        while ((line = reader.readLine()) != null) {
-                            System.out.println(line);
-                        }
-                    }
-                    IOUtils.closeQuietly(in);
-                } catch (Throwable th) {
-                    // Server closed connection before we complete writing..
-                    EXCEPTION_COUNT.incrementAndGet();
-                }
-                return null;
-            });
+            HttpRequestTask requestTask = new HttpRequestTask();
+            Future<Void> next = executor.submit(requestTask);
             FUTURES.add(next);
+            TASKS.add(requestTask);
         }
     }
-
-    public static HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
-        HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
-        try {
-            return client.execute(method);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    public static HttpUriRequest get(String protocol, String host, int port, String path, String query)
-            throws URISyntaxException {
-        URI uri = new URI(protocol, null, host, port, path, query, null);
-        RequestBuilder builder = RequestBuilder.get(uri);
-        builder.setCharset(StandardCharsets.UTF_8);
-        return builder.build();
-    }
-
-    protected HttpUriRequest post(String query) throws URISyntaxException {
-        URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
-        RequestBuilder builder = RequestBuilder.post(uri);
-        StringBuilder str = new StringBuilder();
-        for (int i = 0; i < 2046; i++) {
-            str.append("This is a string statement that will be ignored");
-            str.append('\n');
-        }
-        String statement = str.toString();
-        builder.setHeader("Content-type", "application/x-www-form-urlencoded");
-        builder.addParameter("statement", statement);
-        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
-        builder.setCharset(StandardCharsets.UTF_8);
-        return builder.build();
-    }
 }