You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/08 12:41:35 UTC
svn commit: r1759799 -
/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
Author: frm
Date: Thu Sep 8 12:41:35 2016
New Revision: 1759799
URL: http://svn.apache.org/viewvc?rev=1759799&view=rev
Log:
OAK-4737 - Make the "start" method properly synchronous
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1759799&r1=1759798&r2=1759799&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Thu Sep 8 12:41:35 2016
@@ -32,7 +32,6 @@ import javax.net.ssl.SSLException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -46,15 +45,12 @@ import io.netty.handler.codec.string.Str
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder;
import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
-import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,70 +156,51 @@ public class StandbyServer implements St
handler.state = STATUS_CLOSED;
}
- private void start(boolean wait) {
- if (running) return;
+ @Override
+ public void start() {
+ if (running) {
+ return;
+ }
- this.handler.state = STATUS_STARTING;
+ handler.state = STATUS_STARTING;
- final Thread close = new Thread() {
- @Override
- public void run() {
- try {
- running = true;
- handler.state = STATUS_RUNNING;
- channelFuture.sync().channel().closeFuture().sync();
- } catch (InterruptedException e) {
- StandbyServer.this.stop();
- }
- }
- };
- final ChannelFutureListener bindListener = new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- close.start();
- } else {
- log.error("Server failed to start on port " + port
- + ", will be canceled", future.cause());
- future.channel().close();
- new Thread() {
- @Override
- public void run() {
- close();
- }
- }.start();
- }
- }
- };
- Future<?> startup = bossGroup.submit(new Runnable() {
- @Override
- public void run() {
- //netty 4.0.20 has a race condition issue with
- //asynchronous channel registration. As a workaround
- //we bind asynchronously from the boss event group to make
- //the channel registration synchronous.
- //Note that now this method will return immediately.
- channelFuture = b.bind(port);
- channelFuture.addListener(bindListener);
- }
- });
- if (!startup.awaitUninterruptibly(10000)) {
- log.error("Server failed to start within 10 seconds and will be canceled");
- startup.cancel(true);
- } else if (wait) {
- try {
- close.join();
- } catch (InterruptedException ignored) {}
+ channelFuture = b.bind(port);
+
+ if (channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS)) {
+ onTimelyStart();
+ } else {
+ onStartTimeOut();
}
}
- public void startAndWait() {
- start(true);
+ private void onTimelyStart() {
+ if (channelFuture.isSuccess()) {
+ onSuccessfulStart();
+ }
+
+ if (channelFuture.cause() != null) {
+ onUnsuccessfulStart();
+ }
}
- @Override
- public void start() {
- start(false);
+ private void onSuccessfulStart() {
+ log.debug("Binding was successful");
+ handler.state = STATUS_RUNNING;
+ running = true;
+ }
+
+ private void onUnsuccessfulStart() {
+ log.debug("Binding was unsuccessful", channelFuture.cause());
+ handler.state = null;
+ running = false;
+ throw new RuntimeException(channelFuture.cause());
+ }
+
+ private void onStartTimeOut() {
+ log.debug("Binding timed out, canceling");
+ handler.state = null;
+ running = false;
+ channelFuture.cancel(true);
}
@Override