You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2021/01/27 01:07:49 UTC

[asterixdb] 03/11: [NO ISSUE][OTH] Support multiple addresses in http servers

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 572171e51c6c52b26d4e7b58a1d0940a232d09d6
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Fri Jan 15 16:01:35 2021 +0300

    [NO ISSUE][OTH] Support multiple addresses in http servers
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Allow binding http servers to multiple addresses.
    - Add test cases.
    
    Change-Id: I68f25dc5af471c7ded29f27405c311947a007947
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9624
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 .../org/apache/hyracks/http/server/HttpServer.java | 105 ++++++++++++++-------
 .../apache/hyracks/test/http/HttpServerTest.java   |  49 +++++++++-
 2 files changed, 116 insertions(+), 38 deletions(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 93762a6..97b8859 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,9 @@ package org.apache.hyracks.http.server;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -39,6 +42,7 @@ import org.apache.logging.log4j.Logger;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -50,6 +54,8 @@ import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpScheme;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 public class HttpServer {
     // Constants
@@ -64,6 +70,7 @@ public class HttpServer {
     private static final int STARTING = 1;
     private static final int STARTED = 2;
     private static final int STOPPING = 3;
+    private static final int RECOVERING = 4;
     // Final members
     private final IChannelClosedHandler closedHandler;
     private final Object lock = new Object();
@@ -73,38 +80,59 @@ public class HttpServer {
     private final ServletRegistry servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final InetSocketAddress address;
+    private final InetSocketAddress defaultAddress;
+    private final List<InetSocketAddress> addresses;
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
     private volatile Thread recoveryThread;
-    private volatile Channel channel;
+    private final List<Channel> channels;
     private Throwable cause;
     private HttpServerConfig config;
 
+    private final GenericFutureListener<Future<Void>> channelCloseListener = f -> {
+        // This listener is invoked from within a netty IO thread. Hence, we can never block it
+        // For simplicity, we will submit the recovery task to a different thread. We will also
+        // close all channels on this server and attempt to rebind them.
+        synchronized (lock) {
+            if (state != STARTED) {
+                return;
+            }
+            LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
+            MXHelper.logFileDescriptors();
+            state = RECOVERING;
+            triggerRecovery();
+        }
+    };
+
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
-        this(bossGroup, workerGroup, new InetSocketAddress(port), config, null);
+        this(bossGroup, workerGroup, Collections.singletonList(new InetSocketAddress(port)), config, null);
     }
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
-            HttpServerConfig config) {
-        this(bossGroup, workerGroup, address, config, null);
+            HttpServerConfig config, IChannelClosedHandler closeHandler) {
+        this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler);
     }
 
-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses,
             HttpServerConfig config, IChannelClosedHandler closeHandler) {
+        if (addresses.isEmpty()) {
+            throw new IllegalArgumentException("no addresses specified");
+        }
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
-        this.address = address;
+        this.addresses = addresses;
+        defaultAddress = addresses.get(0);
         this.closedHandler = closeHandler;
         this.config = config;
+        channels = new ArrayList<>();
         ctx = new ConcurrentHashMap<>();
         servlets = new ServletRegistry();
         workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
         int numExecutorThreads = config.getThreadCount();
         executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
                 runnable -> new Thread(runnable,
-                        "HttpExecutor(port:" + address.getPort() + ")-" + threadId.getAndIncrement()));
+                        "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * config.getMaxResponseChunkSize();
         LOGGER.log(Level.DEBUG,
@@ -128,7 +156,7 @@ public class HttpServer {
                 doStart();
                 setStarted();
             } catch (Throwable e) { // NOSONAR
-                LOGGER.error("Failure starting an Http Server at: {}", address, e);
+                LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e);
                 setFailed(e);
                 throw e;
             }
@@ -175,6 +203,8 @@ public class HttpServer {
                 return "STOPPING";
             case STOPPED:
                 return "STOPPED";
+            case RECOVERING:
+                return "RECOVERING";
             default:
                 return "UNKNOWN";
         }
@@ -229,10 +259,10 @@ public class HttpServer {
         for (IServlet servlet : servlets.getServlets()) {
             servlet.init();
         }
-        channel = bind();
+        bind();
     }
 
-    private Channel bind() throws InterruptedException {
+    private void bind() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -240,23 +270,20 @@ public class HttpServer {
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
-        Channel newChannel = b.bind(address).sync().channel();
-        newChannel.closeFuture().addListener(f -> {
-            // This listener is invoked from within a netty IO thread. Hence, we can never block it
-            // For simplicity, we will submit the recovery task to a different thread
+        List<ChannelFuture> channelFutures = new ArrayList<>();
+        for (InetSocketAddress address : addresses) {
+            channelFutures.add(b.bind(address));
+        }
+        for (ChannelFuture future : channelFutures) {
+            Channel channel = future.sync().channel();
+            channel.closeFuture().addListener(channelCloseListener);
             synchronized (lock) {
-                if (state != STARTED) {
-                    return;
-                }
-                LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
-                MXHelper.logFileDescriptors();
-                triggerRecovery();
+                channels.add(channel);
             }
-        });
-        return newChannel;
+        }
     }
 
-    private void triggerRecovery() {
+    private void triggerRecovery() throws InterruptedException {
         Thread rt = recoveryThread;
         if (rt != null) {
             try {
@@ -267,7 +294,7 @@ public class HttpServer {
                 return;
             }
         }
-        // try to revive the channel
+        // try to revive the channels
         recoveryThread = new Thread(this::recover);
         recoveryThread.start();
     }
@@ -275,9 +302,11 @@ public class HttpServer {
     public void recover() {
         try {
             synchronized (lock) {
-                while (state == STARTED) {
+                while (state == RECOVERING) {
                     try {
-                        channel = bind();
+                        closeChannels();
+                        bind();
+                        setStarted();
                         break;
                     } catch (InterruptedException e) {
                         LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
@@ -329,10 +358,7 @@ public class HttpServer {
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e);
         }
-        if (channel != null) {
-            channel.close();
-            channel.closeFuture().sync();
-        }
+        closeChannels();
     }
 
     public IServlet getServlet(FullHttpRequest request) {
@@ -369,8 +395,8 @@ public class HttpServer {
 
     @Override
     public String toString() {
-        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + address + ",\"state\":\"" + getState()
-                + "\"}";
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\""
+                + getState() + "\"}";
     }
 
     public HttpServerConfig getConfig() {
@@ -378,6 +404,17 @@ public class HttpServer {
     }
 
     public InetSocketAddress getAddress() {
-        return address;
+        return defaultAddress;
+    }
+
+    private void closeChannels() throws InterruptedException {
+        synchronized (lock) {
+            for (Channel channel : channels) {
+                channel.closeFuture().removeListener(channelCloseListener);
+                channel.close();
+                channel.closeFuture().sync();
+            }
+            channels.clear();
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
index 84c8c65..76438aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
@@ -28,6 +28,7 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -263,7 +264,10 @@ public class HttpServerTest {
         WebManager webMgr = new WebManager();
         final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(numExecutors)
                 .setRequestQueueSize(serverQueueSize).build();
-        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
+        List<InetSocketAddress> addresses = new ArrayList<>();
+        addresses.add(new InetSocketAddress(PORT));
+        addresses.add(new InetSocketAddress(PORT + 1));
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
         ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
@@ -276,12 +280,12 @@ public class HttpServerTest {
             }
             Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
             // close the channel
-            Field channelField = server.getClass().getDeclaredField("channel");
+            Field channelField = server.getClass().getDeclaredField("channels");
             channelField.setAccessible(true);
             Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
             recoveryThreadField.setAccessible(true);
-            Channel channel = (Channel) channelField.get(server);
-            channel.close();
+            List<Channel> channels = (ArrayList<Channel>) channelField.get(server);
+            channels.get(0).close();
             Thread.sleep(1000);
             final int sleeps = 10;
             for (int i = 0; i < sleeps; i++) {
@@ -409,6 +413,43 @@ public class HttpServerTest {
         }
     }
 
+    @Test
+    public void multiAddressServerTest() throws Exception {
+        final WebManager webMgr = new WebManager();
+        final HttpServerConfig config =
+                HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+        List<Integer> ports = Arrays.asList(PORT, PORT + 1);
+        List<InetSocketAddress> addresses = new ArrayList<>();
+        for (Integer port : ports) {
+            addresses.add(new InetSocketAddress(port));
+        }
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
+        EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            for (Integer port : ports) {
+                try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+                    final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, port,
+                            HttpServerTest.PATH, null, null);
+                    final HttpPost postRequest = new HttpPost(uri);
+                    final String requestBody = "test";
+                    final StringEntity chunkedEntity = new StringEntity(requestBody);
+                    chunkedEntity.setChunked(true);
+                    postRequest.setEntity(chunkedEntity);
+                    try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
+                        final String responseBody = EntityUtils.toString(response.getEntity());
+                        Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
+                        Assert.assertEquals(responseBody, requestBody);
+                    }
+                }
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
     private void request(int count) throws URISyntaxException {
         request(count, 0);
     }