You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/08 12:41:35 UTC

svn commit: r1759799 - /jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java

Author: frm
Date: Thu Sep  8 12:41:35 2016
New Revision: 1759799

URL: http://svn.apache.org/viewvc?rev=1759799&view=rev
Log:
OAK-4737 - Make the "start" method properly synchronous

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1759799&r1=1759798&r2=1759799&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Thu Sep  8 12:41:35 2016
@@ -32,7 +32,6 @@ import javax.net.ssl.SSLException;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -46,15 +45,12 @@ import io.netty.handler.codec.string.Str
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder;
 import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
 import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
-import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,70 +156,51 @@ public class StandbyServer implements St
         handler.state = STATUS_CLOSED;
     }
 
-    private void start(boolean wait) {
-        if (running) return;
+    @Override
+    public void start() {
+        if (running) {
+            return;
+        }
 
-        this.handler.state = STATUS_STARTING;
+        handler.state = STATUS_STARTING;
 
-        final Thread close = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    running = true;
-                    handler.state = STATUS_RUNNING;
-                    channelFuture.sync().channel().closeFuture().sync();
-                } catch (InterruptedException e) {
-                    StandbyServer.this.stop();
-                }
-            }
-        };
-        final ChannelFutureListener bindListener = new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) {
-                if (future.isSuccess()) {
-                    close.start();
-                } else {
-                    log.error("Server failed to start on port " + port
-                            + ", will be canceled", future.cause());
-                    future.channel().close();
-                    new Thread() {
-                        @Override
-                        public void run() {
-                            close();
-                        }
-                    }.start();
-                }
-            }
-        };
-        Future<?> startup = bossGroup.submit(new Runnable() {
-            @Override
-            public void run() {
-                //netty 4.0.20 has a race condition issue with
-                //asynchronous channel registration. As a workaround
-                //we bind asynchronously from the boss event group to make
-                //the channel registration synchronous.
-                //Note that now this method will return immediately.
-                channelFuture = b.bind(port);
-                channelFuture.addListener(bindListener);
-            }
-        });
-        if (!startup.awaitUninterruptibly(10000)) {
-            log.error("Server failed to start within 10 seconds and will be canceled");
-            startup.cancel(true);
-        } else if (wait) {
-            try {
-                close.join();
-            } catch (InterruptedException ignored) {}
+        channelFuture = b.bind(port);
+
+        if (channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS)) {
+            onTimelyStart();
+        } else {
+            onStartTimeOut();
         }
     }
 
-    public void startAndWait() {
-        start(true);
+    private void onTimelyStart() {
+        if (channelFuture.isSuccess()) {
+            onSuccessfulStart();
+        }
+
+        if (channelFuture.cause() != null) {
+            onUnsuccessfulStart();
+        }
     }
 
-    @Override
-    public void start() {
-        start(false);
+    private void onSuccessfulStart() {
+        log.debug("Binding was successful");
+        handler.state = STATUS_RUNNING;
+        running = true;
+    }
+
+    private void onUnsuccessfulStart() {
+        log.debug("Binding was unsuccessful", channelFuture.cause());
+        handler.state = null;
+        running = false;
+        throw new RuntimeException(channelFuture.cause());
+    }
+
+    private void onStartTimeOut() {
+        log.debug("Binding timed out, canceling");
+        handler.state = null;
+        running = false;
+        channelFuture.cancel(true);
     }
 
     @Override