You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/02/18 19:51:13 UTC

asterixdb git commit: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops

Repository: asterixdb
Updated Branches:
  refs/heads/master 5259810fe -> 9b1cd7968


[ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, when the http server channel drops unexpectedly, we
  did nothing.
- After this change, the http server will log the event and try
  to re-bind to the port until it either succeeds or
  server.stop() is invoked.

Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2382
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9b1cd796
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9b1cd796
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9b1cd796

Branch: refs/heads/master
Commit: 9b1cd7968298f38cd17483b91c8ea4a56d2b17bd
Parents: 5259810
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Feb 14 15:10:35 2018 -0800
Committer: Michael Blow <mb...@apache.org>
Committed: Sun Feb 18 11:50:46 2018 -0800

----------------------------------------------------------------------
 .../apache/hyracks/http/server/HttpServer.java  | 87 +++++++++++++++++++-
 .../hyracks/http/test/HttpServerTest.java       | 62 ++++++++++++++
 2 files changed, 147 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b1cd796/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 19436ab..8ce1d70 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -73,7 +73,8 @@ public class HttpServer {
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
-    private Channel channel;
+    private volatile Thread recoveryThread;
+    private volatile Channel channel;
     private Throwable cause;
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
@@ -134,6 +135,14 @@ public class HttpServer {
                 throw e;
             }
         }
+        // Should wait for the recovery thread outside synchronized block
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            rt.join(TimeUnit.SECONDS.toMillis(5));
+            if (recoveryThread != null) {
+                LOGGER.log(Level.ERROR, "Failure stopping recovery thread of {}", this);
+            }
+        }
     }
 
     /**
@@ -209,6 +218,10 @@ public class HttpServer {
          * Note that it doesn't work for the case where multiple paths map to a single IServlet
          */
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
+        channel = bind();
+    }
+
+    private Channel bind() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -216,10 +229,74 @@ public class HttpServer {
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
-        channel = b.bind(port).sync().channel();
+        Channel newChannel = b.bind(port).sync().channel();
+        newChannel.closeFuture().addListener(f -> {
+            // This listener is invoked from within a netty IO thread. Hence, we can never block it
+            // For simplicity, we will submit the recovery task to a different thread
+            synchronized (lock) {
+                if (state != STARTED) {
+                    return;
+                }
+                LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
+                triggerRecovery();
+            }
+        });
+        return newChannel;
+    }
+
+    private void triggerRecovery() {
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            try {
+                rt.join();
+            } catch (InterruptedException e) {
+                LOGGER.log(Level.WARN, this + " recovery was interrupted", e);
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+        // try to revive the channel
+        recoveryThread = new Thread(this::recover);
+        recoveryThread.start();
+    }
+
+    public void recover() {
+        try {
+            synchronized (lock) {
+                while (state == STARTED) {
+                    try {
+                        channel = bind();
+                        break;
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
+                        setFailed(e);
+                        Thread.currentThread().interrupt();
+                    } catch (Throwable th) {
+                        // sleep for 5s
+                        LOGGER.log(Level.WARN, this + " failed server recovery attempt. "
+                                + "Sleeping for 5s before starting the next attempt", th);
+                        try {
+                            // Wait on lock to allow stop request to be executed
+                            lock.wait(TimeUnit.SECONDS.toMillis(5));
+                        } catch (InterruptedException e) {
+                            LOGGER.log(Level.WARN, this + " interrupted while attempting to revive server channel", e);
+                            setFailed(e);
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            }
+        } finally {
+            recoveryThread = null;
+        }
     }
 
     protected void doStop() throws InterruptedException {
+        // stop recovery if it was ongoing
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            rt.interrupt();
+        }
         // stop taking new requests
         executor.shutdown();
         try {
@@ -300,4 +377,10 @@ public class HttpServer {
     public int getWorkQueueSize() {
         return workQueue.size();
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\"" + getState()
+                + "\"}";
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9b1cd796/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 9067586..298d2de 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -48,13 +48,18 @@ import org.apache.hyracks.http.server.WebManager;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import io.netty.channel.Channel;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class HttpServerTest {
+    private static final Logger LOGGER = LogManager.getLogger();
     static final boolean PRINT_TO_CONSOLE = false;
     static final int PORT = 9898;
     static final String HOST = "localhost";
@@ -241,6 +246,63 @@ public class HttpServerTest {
         }
     }
 
+    @Test
+    public void testServerRevival() throws Exception {
+        int numExecutors = 16;
+        int serverQueueSize = 16;
+        int numRequests = 1;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            // send a request
+            request(numRequests);
+            for (Future<Void> thread : FUTURES) {
+                thread.get();
+            }
+            Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+            // close the channel
+            Field channelField = server.getClass().getDeclaredField("channel");
+            channelField.setAccessible(true);
+            Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
+            recoveryThreadField.setAccessible(true);
+            Channel channel = (Channel) channelField.get(server);
+            channel.close();
+            Thread.sleep(1000);
+            final int sleeps = 10;
+            for (int i = 0; i < sleeps; i++) {
+                Thread thread = (Thread) recoveryThreadField.get(server);
+                if (thread == null) {
+                    break;
+                }
+                LOGGER.log(Level.WARN,
+                        "Attempt #" + (i + 1) + ". Recovery thread is not null and has id " + thread.getId());
+                if (i == sleeps - 1) {
+                    throw new Exception("Http server recovery didn't complete after " + sleeps + "s");
+                }
+                Thread.sleep(1000);
+            }
+            for (int i = 0; i < sleeps; i++) {
+                request(1);
+                for (Future<Void> thread : FUTURES) {
+                    thread.get();
+                }
+                if (numRequests + 1 == SUCCESS_COUNT.get()) {
+                    break;
+                } else if (i == sleeps - 1) {
+                    throw new Exception(
+                            "Http server couldn't process requests correctly after recovery for " + sleeps + "s");
+                }
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
     public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
         Field f = obj.getClass().getDeclaredField(filedName);
         f.setAccessible(true);