You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/03/04 17:46:14 UTC

svn commit: r1733615 - in /jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client: SegmentLoaderHandler.java StandbyClient.java StandbyClientHandler.java

Author: frm
Date: Fri Mar  4 16:46:13 2016
New Revision: 1733615

URL: http://svn.apache.org/viewvc?rev=1733615&view=rev
Log:
OAK-4083 - Simplify concurrency of the standby client pipeline

Modified:
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Fri Mar  4 16:46:13 2016
@@ -23,13 +23,14 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
@@ -42,51 +43,82 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter
-        implements RemoteSegmentLoader {
+public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter implements RemoteSegmentLoader {
 
-    private static final Logger log = LoggerFactory
-            .getLogger(SegmentLoaderHandler.class);
+    private static final Logger log = LoggerFactory.getLogger(SegmentLoaderHandler.class);
 
     private final StandbyStore store;
     private final String clientID;
     private final RecordId head;
-    private final EventExecutorGroup loaderExecutor;
     private final AtomicBoolean running;
     private final int readTimeoutMs;
     private final boolean autoClean;
 
-    private ChannelHandlerContext ctx;
+    private volatile ChannelHandlerContext ctx;
 
-    final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
+    private final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
 
-    public SegmentLoaderHandler(final StandbyStore store, RecordId head,
-            EventExecutorGroup loaderExecutor,
-            String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
+    // Use a separate thread for sync'ing. Leave the I/O thread free to process
+    // I/O requests.
+    private ExecutorService syncExecutor;
+
+    public SegmentLoaderHandler(StandbyStore store, RecordId head, String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
         this.store = store;
         this.head = head;
-        this.loaderExecutor = loaderExecutor;
         this.clientID = clientID;
         this.running = running;
         this.readTimeoutMs = readTimeoutMs;
         this.autoClean = autoClean;
+        this.syncExecutor = Executors.newSingleThreadExecutor();
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
         this.ctx = ctx;
-        initSync();
     }
 
     @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
-            throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.error("Exception caught, closing channel.", cause);
+        close();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        syncExecutor.shutdown();
+        syncExecutor = null;
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof SegmentReply) {
-            segment.offer((SegmentReply) evt);
+            onSegmentReply((SegmentReply) evt);
+        }
+
+        if (evt instanceof String) {
+            onCommand((String) evt);
         }
     }
 
-    private void initSync() {
+    private void onSegmentReply(SegmentReply reply) {
+        // Offer the reply from the I/O thread, unblocking the sync thread.
+        segment.offer(reply);
+    }
+
+    private void onCommand(String command) {
+        if (command.equals("sync")) {
+            syncExecutor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    sync();
+                }
+
+            });
+        }
+    }
+
+    private void sync() {
         log.debug("new head id " + head);
         long t = System.currentTimeMillis();
         long preSyncSize = -1;
@@ -142,23 +174,20 @@ public class SegmentLoaderHandler extend
 
     @Override
     public Segment readSegment(final String id) {
+        // Use the I/O thread to write the request to the server
         ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
+        // Wait on the sync thread for the response.
         return getSegment(id);
     }
 
     @Override
     public Blob readBlob(String blobId) {
+        // Use the I/O thread to write the request to the server
         ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId));
+        // Wait on the sync thread for the response.
         return getBlob(blobId);
     }
 
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-            throws Exception {
-        log.error("Exception caught, closing channel.", cause);
-        close();
-    }
-
     private Segment getSegment(final String id) {
         return getReply(id, SegmentReply.SEGMENT).getSegment();
     }
@@ -172,12 +201,14 @@ public class SegmentLoaderHandler extend
         try {
             for (;;) {
                 try {
-                    SegmentReply r = segment.poll(readTimeoutMs,
-                            TimeUnit.MILLISECONDS);
+                    // Block the sync thread for a response from the server.
+                    SegmentReply r = segment.poll(readTimeoutMs, TimeUnit.MILLISECONDS);
+
                     if (r == null) {
                         log.warn("timeout waiting for {}", id);
                         return SegmentReply.empty();
                     }
+
                     if (r.getType() == type) {
                         switch (r.getType()) {
                         case SegmentReply.SEGMENT:
@@ -211,8 +242,7 @@ public class SegmentLoaderHandler extend
 
     @Override
     public boolean isClosed() {
-        return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
-                .isShutdown()));
+        return !ctx.channel().isActive();
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java Fri Mar  4 16:46:13 2016
@@ -44,8 +44,6 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean;
@@ -70,7 +68,6 @@ public final class StandbyClient impleme
     private final CommunicationObserver observer;
     private StandbyClientHandler handler;
     private EventLoopGroup group;
-    private EventExecutorGroup executor;
     private SslContext sslContext;
     private boolean active = false;
     private int failedRequests;
@@ -142,7 +139,6 @@ public final class StandbyClient impleme
                 return;
             }
             state = STATUS_STARTING;
-            executor = new DefaultEventExecutorGroup(4);
             handler = new StandbyClientHandler(this.store, observer, running,
                     readTimeoutMs, autoClean);
             group = new NioEventLoopGroup();
@@ -167,7 +163,7 @@ public final class StandbyClient impleme
                     p.addLast(new StringEncoder(CharsetUtil.UTF_8));
                     p.addLast(new SnappyFramedDecoder(true));
                     p.addLast(new RecordIdDecoder(store));
-                    p.addLast(executor, handler);
+                    p.addLast(handler);
                 }
             });
             state = STATUS_RUNNING;
@@ -200,13 +196,9 @@ public final class StandbyClient impleme
             handler.close();
             handler = null;
         }
-        if (executor != null && !executor.isShuttingDown()) {
-            executor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
         if (group != null && !group.isShuttingDown()) {
-            group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
+            group.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
+            group = null;
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java Fri Mar  4 16:46:13 2016
@@ -21,14 +21,11 @@ package org.apache.jackrabbit.oak.plugin
 import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq;
 
 import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
@@ -37,8 +34,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId>
-        implements Closeable {
+public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId> implements Closeable {
 
     private static final Logger log = LoggerFactory
             .getLogger(StandbyClientHandler.class);
@@ -49,11 +45,7 @@ public class StandbyClientHandler extend
     private final int readTimeoutMs;
     private final boolean autoClean;
 
-    private EventExecutorGroup loaderExecutor;
-
-    public StandbyClientHandler(final StandbyStore store,
-            CommunicationObserver observer, AtomicBoolean running,
-            int readTimeoutMs, boolean autoClean) {
+    public StandbyClientHandler(final StandbyStore store, CommunicationObserver observer, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
         this.store = store;
         this.observer = observer;
         this.running = running;
@@ -69,8 +61,7 @@ public class StandbyClientHandler extend
     }
 
     @Override
-    protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
-            throws Exception {
+    protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) throws Exception {
         setHead(ctx, msg);
     };
 
@@ -80,7 +71,6 @@ public class StandbyClientHandler extend
     }
 
     synchronized void setHead(ChannelHandlerContext ctx, RecordId head) {
-
         if (store.getHead().getRecordId().equals(head)) {
             // all sync'ed up
             log.debug("no changes on sync.");
@@ -93,28 +83,19 @@ public class StandbyClientHandler extend
         ctx.pipeline().remove(this);
         ctx.pipeline().addLast(new ReplyDecoder(store));
 
-        loaderExecutor = new DefaultEventExecutorGroup(4);
-        SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
-                loaderExecutor, this.observer.getID(), running, readTimeoutMs,
-                autoClean);
-        ctx.pipeline().addLast(loaderExecutor, h2);
-
-        h2.channelActive(ctx);
-        log.debug("updating current head finished");
+        ctx.pipeline().addLast(new SegmentLoaderHandler(store, head, this.observer.getID(), running, readTimeoutMs, autoClean));
+        ctx.pipeline().fireUserEventTriggered("sync");
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-            throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         log.error("Exception caught, closing channel.", cause);
         close();
     }
 
     @Override
-    public synchronized void close() {
-        if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
-            loaderExecutor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
+    public void close() {
+        // This handler doesn't own resources to release
     }
+
 }