You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/03/04 17:46:14 UTC
svn commit: r1733615 - in
/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client:
SegmentLoaderHandler.java StandbyClient.java StandbyClientHandler.java
Author: frm
Date: Fri Mar 4 16:46:13 2016
New Revision: 1733615
URL: http://svn.apache.org/viewvc?rev=1733615&view=rev
Log:
OAK-4083 - Simplify concurrency of the standby client pipeline
Modified:
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Fri Mar 4 16:46:13 2016
@@ -23,13 +23,14 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
@@ -42,51 +43,82 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter
- implements RemoteSegmentLoader {
+public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter implements RemoteSegmentLoader {
- private static final Logger log = LoggerFactory
- .getLogger(SegmentLoaderHandler.class);
+ private static final Logger log = LoggerFactory.getLogger(SegmentLoaderHandler.class);
private final StandbyStore store;
private final String clientID;
private final RecordId head;
- private final EventExecutorGroup loaderExecutor;
private final AtomicBoolean running;
private final int readTimeoutMs;
private final boolean autoClean;
- private ChannelHandlerContext ctx;
+ private volatile ChannelHandlerContext ctx;
- final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
+ private final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
- public SegmentLoaderHandler(final StandbyStore store, RecordId head,
- EventExecutorGroup loaderExecutor,
- String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
+ // Use a separate thread for sync'ing. Leave the I/O thread free to process
+ // I/O requests.
+ private ExecutorService syncExecutor;
+
+ public SegmentLoaderHandler(StandbyStore store, RecordId head, String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
this.store = store;
this.head = head;
- this.loaderExecutor = loaderExecutor;
this.clientID = clientID;
this.running = running;
this.readTimeoutMs = readTimeoutMs;
this.autoClean = autoClean;
+ this.syncExecutor = Executors.newSingleThreadExecutor();
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
- initSync();
}
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.error("Exception caught, closing channel.", cause);
+ close();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ syncExecutor.shutdown();
+ syncExecutor = null;
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SegmentReply) {
- segment.offer((SegmentReply) evt);
+ onSegmentReply((SegmentReply) evt);
+ }
+
+ if (evt instanceof String) {
+ onCommand((String) evt);
}
}
- private void initSync() {
+ private void onSegmentReply(SegmentReply reply) {
+ // Offer the reply from the I/O thread, unblocking the sync thread.
+ segment.offer(reply);
+ }
+
+ private void onCommand(String command) {
+ if (command.equals("sync")) {
+ syncExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ sync();
+ }
+
+ });
+ }
+ }
+
+ private void sync() {
log.debug("new head id " + head);
long t = System.currentTimeMillis();
long preSyncSize = -1;
@@ -142,23 +174,20 @@ public class SegmentLoaderHandler extend
@Override
public Segment readSegment(final String id) {
+ // Use the I/O thread to write the request to the server
ctx.writeAndFlush(newGetSegmentReq(this.clientID, id));
+ // Wait on the sync thread for the response.
return getSegment(id);
}
@Override
public Blob readBlob(String blobId) {
+ // Use the I/O thread to write the request to the server
ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId));
+ // Wait on the sync thread for the response.
return getBlob(blobId);
}
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- log.error("Exception caught, closing channel.", cause);
- close();
- }
-
private Segment getSegment(final String id) {
return getReply(id, SegmentReply.SEGMENT).getSegment();
}
@@ -172,12 +201,14 @@ public class SegmentLoaderHandler extend
try {
for (;;) {
try {
- SegmentReply r = segment.poll(readTimeoutMs,
- TimeUnit.MILLISECONDS);
+ // Block the sync thread for a response from the server.
+ SegmentReply r = segment.poll(readTimeoutMs, TimeUnit.MILLISECONDS);
+
if (r == null) {
log.warn("timeout waiting for {}", id);
return SegmentReply.empty();
}
+
if (r.getType() == type) {
switch (r.getType()) {
case SegmentReply.SEGMENT:
@@ -211,8 +242,7 @@ public class SegmentLoaderHandler extend
@Override
public boolean isClosed() {
- return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
- .isShutdown()));
+ return !ctx.channel().isActive();
}
@Override
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java Fri Mar 4 16:46:13 2016
@@ -44,8 +44,6 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean;
@@ -70,7 +68,6 @@ public final class StandbyClient impleme
private final CommunicationObserver observer;
private StandbyClientHandler handler;
private EventLoopGroup group;
- private EventExecutorGroup executor;
private SslContext sslContext;
private boolean active = false;
private int failedRequests;
@@ -142,7 +139,6 @@ public final class StandbyClient impleme
return;
}
state = STATUS_STARTING;
- executor = new DefaultEventExecutorGroup(4);
handler = new StandbyClientHandler(this.store, observer, running,
readTimeoutMs, autoClean);
group = new NioEventLoopGroup();
@@ -167,7 +163,7 @@ public final class StandbyClient impleme
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new SnappyFramedDecoder(true));
p.addLast(new RecordIdDecoder(store));
- p.addLast(executor, handler);
+ p.addLast(handler);
}
});
state = STATUS_RUNNING;
@@ -200,13 +196,9 @@ public final class StandbyClient impleme
handler.close();
handler = null;
}
- if (executor != null && !executor.isShuttingDown()) {
- executor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
if (group != null && !group.isShuttingDown()) {
- group.shutdownGracefully(0, 1, TimeUnit.SECONDS)
- .syncUninterruptibly();
+ group.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
+ group = null;
}
}
Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1733615&r1=1733614&r2=1733615&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java Fri Mar 4 16:46:13 2016
@@ -21,14 +21,11 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq;
import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
@@ -37,8 +34,7 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId>
- implements Closeable {
+public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId> implements Closeable {
private static final Logger log = LoggerFactory
.getLogger(StandbyClientHandler.class);
@@ -49,11 +45,7 @@ public class StandbyClientHandler extend
private final int readTimeoutMs;
private final boolean autoClean;
- private EventExecutorGroup loaderExecutor;
-
- public StandbyClientHandler(final StandbyStore store,
- CommunicationObserver observer, AtomicBoolean running,
- int readTimeoutMs, boolean autoClean) {
+ public StandbyClientHandler(final StandbyStore store, CommunicationObserver observer, AtomicBoolean running, int readTimeoutMs, boolean autoClean) {
this.store = store;
this.observer = observer;
this.running = running;
@@ -69,8 +61,7 @@ public class StandbyClientHandler extend
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
- throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) throws Exception {
setHead(ctx, msg);
};
@@ -80,7 +71,6 @@ public class StandbyClientHandler extend
}
synchronized void setHead(ChannelHandlerContext ctx, RecordId head) {
-
if (store.getHead().getRecordId().equals(head)) {
// all sync'ed up
log.debug("no changes on sync.");
@@ -93,28 +83,19 @@ public class StandbyClientHandler extend
ctx.pipeline().remove(this);
ctx.pipeline().addLast(new ReplyDecoder(store));
- loaderExecutor = new DefaultEventExecutorGroup(4);
- SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
- loaderExecutor, this.observer.getID(), running, readTimeoutMs,
- autoClean);
- ctx.pipeline().addLast(loaderExecutor, h2);
-
- h2.channelActive(ctx);
- log.debug("updating current head finished");
+ ctx.pipeline().addLast(new SegmentLoaderHandler(store, head, this.observer.getID(), running, readTimeoutMs, autoClean));
+ ctx.pipeline().fireUserEventTriggered("sync");
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Exception caught, closing channel.", cause);
close();
}
@Override
- public synchronized void close() {
- if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
- loaderExecutor.shutdownGracefully(0, 1, TimeUnit.SECONDS)
- .syncUninterruptibly();
- }
+ public void close() {
+ // This handler doesn't own resources to release
}
+
}