You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/10/20 15:18:53 UTC

svn commit: r1765831 - in /jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment: standby/ test/proxy/

Author: frm
Date: Thu Oct 20 15:18:53 2016
New Revision: 1765831

URL: http://svn.apache.org/viewvc?rev=1765831&view=rev
Log:
OAK-4958 - Improve consistency of NetworkErrorProxy

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java
      - copied, changed from r1765814, jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java?rev=1765831&r1=1765830&r2=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java Thu Oct 20 15:18:53 2016
@@ -30,6 +30,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
 import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
 import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore;
+import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -138,7 +139,7 @@ public class BrokenNetworkIT extends Tes
         ) {
             p.skipBytes(skipPosition, skipBytes);
             p.flipByte(flipPosition);
-            p.run();
+            p.connect();
 
             serverSync.start();
 

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1765831&r1=1765830&r2=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java Thu Oct 20 15:18:53 2016
@@ -26,28 +26,23 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
 
 import com.google.common.io.ByteStreams;
-import org.apache.jackrabbit.core.data.FileDataStore;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
 import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
 import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
+import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
 import org.junit.Test;
 
 public abstract class DataStoreTestBase extends TestBase {
@@ -156,7 +151,7 @@ public abstract class DataStoreTestBase
         ) {
             p.skipBytes(skipPosition, skipBytes);
             p.flipByte(flipPosition);
-            p.run();
+            p.connect();
 
             serverSync.start();
             primary.flush();

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BackwardHandler extends ChannelInboundHandlerAdapter {
+
+    private static final Logger log = LoggerFactory.getLogger(BackwardHandler.class);
+
+    private final Channel target;
+
+    BackwardHandler(Channel target) {
+        this.target = target;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        target.write(msg);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        target.flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.error("Unexpected error, closing channel", cause);
+        target.close();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+class FlipHandler extends ChannelInboundHandlerAdapter {
+
+    private int current;
+
+    private int flip;
+
+    FlipHandler(int pos) {
+        this.flip = pos;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof ByteBuf) {
+            onByteBuf((ByteBuf) msg);
+        }
+        ctx.fireChannelRead(msg);
+    }
+
+    private void onByteBuf(ByteBuf b) {
+        int i = flip - current;
+        if (0 <= i && i < b.readableBytes()) {
+            b.setByte(i, ~b.getByte(i));
+        }
+        current += b.readableBytes();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ForwardHandler extends ChannelInboundHandlerAdapter {
+
+    private static final Logger log = LoggerFactory.getLogger(ForwardHandler.class);
+
+    private final String targetHost;
+
+    private final int targetPort;
+
+    private final int skipPosition;
+
+    private final int skipBytes;
+
+    private final int flipPosition;
+
+    private Channel remote;
+
+    private EventLoopGroup group;
+
+    ForwardHandler(String host, int port, int flipPosition, int skipPosition, int skipBytes) {
+        this.targetHost = host;
+        this.targetPort = port;
+        this.flipPosition = flipPosition;
+        this.skipPosition = skipPosition;
+        this.skipBytes = skipBytes;
+    }
+
+    @Override
+    public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
+        group = new NioEventLoopGroup();
+        Bootstrap b = new Bootstrap()
+                .group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new ChannelInitializer<SocketChannel>() {
+
+                    @Override
+                    public void initChannel(SocketChannel ch) throws Exception {
+                        if (flipPosition >= 0) {
+                            ch.pipeline().addLast(new FlipHandler(flipPosition));
+                        }
+                        if (skipBytes > 0) {
+                            ch.pipeline().addLast(new SkipHandler(skipPosition, skipBytes));
+                        }
+                        ch.pipeline().addLast(new BackwardHandler(ctx.channel()));
+                    }
+
+                });
+        ChannelFuture f = b.connect(targetHost, targetPort);
+        if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+            log.debug("Connected to remote host");
+        } else {
+            log.debug("Connection to remote host timed out");
+        }
+        remote = f.channel();
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        if (remote.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+            log.debug("Connection to remote host closed");
+        } else {
+            log.debug("Closing connection to remote host timed out");
+        }
+        if (group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+            log.debug("Group shut down");
+        } else {
+            log.debug("Shutting down group timed out");
+        }
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        remote.write(msg);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        remote.flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        log.error("Unexpected error, closing channel", cause);
+        ctx.close();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java (from r1765814, jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java?p2=jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java&p1=jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java&r1=1765814&r2=1765831&rev=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java Thu Oct 20 15:18:53 2016
@@ -17,30 +17,31 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.segment.standby;
+package org.apache.jackrabbit.oak.segment.test.proxy;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NetworkErrorProxy implements Closeable {
 
-    static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
+    private static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
+
+    private static final int DEFAULT_FLIP_POSITION = -1;
+
+    private static final int DEFAULT_SKIP_POSITION = -1;
+
+    private static final int DEFAULT_SKIP_LENGTH = 0;
 
     private final int inboundPort;
 
@@ -48,290 +49,89 @@ public class NetworkErrorProxy implement
 
     private final String host;
 
-    private ChannelFuture f;
+    private int flipPosition = DEFAULT_FLIP_POSITION;
+
+    private int skipPosition = DEFAULT_SKIP_POSITION;
 
-    private ForwardHandler fh;
+    private int skipLength = DEFAULT_SKIP_LENGTH;
 
-    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+    private Channel server;
 
-    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private EventLoopGroup boss = new NioEventLoopGroup();
+
+    private EventLoopGroup worker = new NioEventLoopGroup();
 
     public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) {
         this.inboundPort = inboundPort;
         this.outboundPort = outboundPort;
         this.host = outboundHost;
-        this.fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort);
     }
 
     public void skipBytes(int pos, int n) {
-        this.fh.skipPosition = pos;
-        this.fh.skipBytes = n;
+        skipPosition = pos;
+        skipLength = n;
     }
 
     public void flipByte(int pos) {
-        this.fh.flipPosition = pos;
+        flipPosition = pos;
     }
 
-    public void run() throws Exception {
-        try {
-            ServerBootstrap b = new ServerBootstrap()
-                    .group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-
-                        @Override
-                        public void initChannel(SocketChannel ch) throws Exception {
-                            ch.pipeline().addLast(fh);
-                        }
-
-                    });
-
-            f = b.bind(this.inboundPort).sync();
-        } catch (Exception e) {
-            log.warn(String.format("Unable to start proxy on port %d", inboundPort), e);
+    public void connect() throws Exception {
+        log.info("Starting proxy with flip={}, skip={},{}", flipPosition, skipPosition, skipLength);
+        ServerBootstrap b = new ServerBootstrap()
+                .group(boss, worker)
+                .channel(NioServerSocketChannel.class)
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                    @Override
+                    public void initChannel(SocketChannel ch) throws Exception {
+                        ch.pipeline().addLast(new ForwardHandler(host, outboundPort, flipPosition, skipPosition, skipLength));
+                    }
+
+                });
+        ChannelFuture f = b.bind(this.inboundPort);
+        if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+            log.debug("Bound on port {}", inboundPort);
+        } else {
+            log.debug("Binding on port {} timed out", inboundPort);
         }
+        server = f.channel();
     }
 
     public void reset() throws Exception {
-        if (f == null) {
-            throw new Exception("proxy not started");
-        }
-
-        if (f.channel().disconnect().awaitUninterruptibly(10, TimeUnit.SECONDS)) {
-            log.debug("Channel disconnected");
-        } else {
-            log.debug("Channel disconnect timed out");
+        flipPosition = DEFAULT_FLIP_POSITION;
+        skipPosition = DEFAULT_SKIP_POSITION;
+        skipLength = DEFAULT_SKIP_LENGTH;
+        if (server != null) {
+            if (server.disconnect().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+                log.debug("Channel disconnected");
+            } else {
+                log.debug("Channel disconnect timed out");
+            }
         }
-
-        fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort);
-
-        run();
+        connect();
     }
 
     @Override
     public void close() {
-        if (f != null) {
-            if (f.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS)) {
+        if (server != null) {
+            if (server.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
                 log.debug("Channel closed");
             } else {
                 log.debug("Channel close timed out");
             }
         }
-
-        if (bossGroup.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+        if (boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
             log.debug("Boss group shut down");
         } else {
             log.debug("Boss group shutdown timed out");
         }
-
-        if (workerGroup.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+        if (worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
             log.debug("Worker group shut down");
         } else {
             log.debug("Worker group shutdown timed out");
         }
     }
-}
-
-class ForwardHandler extends ChannelInboundHandlerAdapter {
-
-    private final String targetHost;
-
-    private final int targetPort;
-
-    public long transferredBytes;
-
-    public int skipPosition;
 
-    public int skipBytes;
-
-    public int flipPosition;
-
-    private ChannelFuture remote;
-
-    public ForwardHandler(String host, int port) {
-        this.targetHost = host;
-        this.targetPort = port;
-        this.flipPosition = -1;
-    }
-
-    @Override
-    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-        final ChannelHandlerContext c = ctx;
-        EventLoopGroup group = new NioEventLoopGroup();
-        Bootstrap cb = new Bootstrap();
-        cb.group(group);
-        cb.channel(NioSocketChannel.class);
-
-        cb.handler(new ChannelInitializer<SocketChannel>() {
-
-            @Override
-            public void initChannel(SocketChannel ch) throws Exception {
-                SendBackHandler sbh = new SendBackHandler(c);
-                if (ForwardHandler.this.flipPosition >= 0) {
-                    sbh = new BitFlipHandler(c, ForwardHandler.this.flipPosition);
-                } else if (ForwardHandler.this.skipBytes > 0) {
-                    sbh = new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes);
-                }
-                ch.pipeline().addFirst(sbh);
-            }
-        });
-        remote = cb.connect(this.targetHost, this.targetPort).sync();
-
-        ctx.fireChannelRegistered();
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-        remote.channel().close();
-        remote = null;
-        ctx.fireChannelUnregistered();
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof ByteBuf) {
-            ByteBuf bb = (ByteBuf) msg;
-            this.transferredBytes += (bb.writerIndex() - bb.readerIndex());
-        }
-        remote.channel().write(msg);
-    }
-
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-        remote.channel().flush();
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-        NetworkErrorProxy.log.debug(cause.getMessage(), cause);
-        ctx.close();
-    }
 }
 
-class SendBackHandler implements ChannelInboundHandler {
-
-    private final ChannelHandlerContext target;
-
-    public long transferredBytes;
-
-    public SendBackHandler(ChannelHandlerContext ctx) {
-        this.target = ctx;
-    }
-
-    @Override
-    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    public int messageSize(Object msg) {
-        if (msg instanceof ByteBuf) {
-            ByteBuf bb = (ByteBuf) msg;
-            return (bb.writerIndex() - bb.readerIndex());
-        }
-        // unknown
-        return 0;
-    }
-
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        this.transferredBytes += messageSize(msg);
-        this.target.write(msg);
-    }
-
-    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-        this.target.flush();
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-    }
-
-    @Override
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        NetworkErrorProxy.log.debug(cause.getMessage(), cause);
-        this.target.close();
-    }
-
-}
-
-class SwallowingHandler extends SendBackHandler {
-
-    private int skipStartingPos;
-
-    private int nrOfBytes;
-
-    public SwallowingHandler(ChannelHandlerContext ctx, int skipStartingPos, int numberOfBytes) {
-        super(ctx);
-        this.skipStartingPos = skipStartingPos;
-        this.nrOfBytes = numberOfBytes;
-    }
-
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof ByteBuf) {
-            ByteBuf bb = (ByteBuf) msg;
-            if (this.nrOfBytes > 0) {
-                if (this.transferredBytes >= this.skipStartingPos) {
-                    bb.skipBytes(this.nrOfBytes);
-                    this.nrOfBytes = 0;
-                } else {
-                    this.skipStartingPos -= messageSize(msg);
-                }
-            }
-        }
-        super.channelRead(ctx, msg);
-    }
-
-}
-
-class BitFlipHandler extends SendBackHandler {
-
-    private static final Logger log = LoggerFactory
-            .getLogger(BitFlipHandler.class);
-
-    private int startingPos;
-
-    public BitFlipHandler(ChannelHandlerContext ctx, int pos) {
-        super(ctx);
-        this.startingPos = pos;
-    }
-
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof ByteBuf) {
-            ByteBuf bb = (ByteBuf) msg;
-            log.debug("FlipHandler. Got Buffer size: " + bb.readableBytes());
-            if (this.startingPos >= 0) {
-                if (this.transferredBytes + bb.readableBytes() >= this.startingPos) {
-                    int i = this.startingPos - (int) this.transferredBytes;
-                    log.info("FlipHandler flips byte at offset " + (this.transferredBytes + i));
-                    byte b = (byte) (bb.getByte(i) ^ 0x01);
-                    bb.setByte(i, b);
-                    this.startingPos = -1;
-                }
-            }
-        }
-        super.channelRead(ctx, msg);
-    }
-
-}

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+class SkipHandler extends ChannelInboundHandlerAdapter {
+
+    private int cur;
+
+    private int pos;
+
+    private int len;
+
+    SkipHandler(int pos, int len) {
+        this.pos = pos;
+        this.len = len;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof ByteBuf) {
+            onByteBuf(ctx, (ByteBuf) msg);
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    private void onByteBuf(ChannelHandlerContext ctx, ByteBuf in) {
+        ByteBuf out = Unpooled.buffer();
+        for (int i = 0; i < in.readableBytes(); i++) {
+            if (cur < pos || pos + len <= cur) {
+                out.writeByte(in.getByte(i));
+            }
+            cur++;
+        }
+        if (out.readableBytes() > 0) {
+            ctx.fireChannelRead(out);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native