You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2017/10/27 12:19:53 UTC

svn commit: r1813519 - in /jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment: standby/ test/proxy/

Author: frm
Date: Fri Oct 27 12:19:53 2017
New Revision: 1813519

URL: http://svn.apache.org/viewvc?rev=1813519&view=rev
Log:
OAK-6829 - Make the configuration of NetworkErrorProxy immutable

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java Fri Oct 27 12:19:53 2017
@@ -25,16 +25,14 @@ import static org.junit.Assert.assertFal
 
 import java.io.File;
 
+import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
 import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
 import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore;
-import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
 import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.RuleChain;
@@ -48,15 +46,10 @@ public class BrokenNetworkIT extends Tes
 
     private TemporaryFileStore clientFileStore1 = new TemporaryFileStore(folder, true);
 
-    private TemporaryFileStore clientFileStore2 = new TemporaryFileStore(folder, true);
-
-    private NetworkErrorProxy proxy;
-
     @Rule
     public RuleChain chain = RuleChain.outerRule(folder)
-            .around(serverFileStore)
-            .around(clientFileStore1)
-            .around(clientFileStore2);
+        .around(serverFileStore)
+        .around(clientFileStore1);
 
     @Rule
     public TemporaryPort serverPort = new TemporaryPort();
@@ -64,44 +57,64 @@ public class BrokenNetworkIT extends Tes
     @Rule
     public TemporaryPort proxyPort = new TemporaryPort();
 
-    @Before
-    public void beforeClass() {
-        proxy = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort());
-    }
-
-    @After
-    public void afterClass() {
-        proxy.close();
-    }
-
     @Test
     public void testProxy() throws Exception {
-        useProxy(false);
+        FileStore serverStore = serverFileStore.fileStore();
+        FileStore clientStore = clientFileStore1.fileStore();
+
+        NodeStore store = SegmentNodeStoreBuilders.builder(serverStore).build();
+        addTestContent(store, "server");
+        serverStore.flush();
+
+        try (
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, false);
+            StandbyClientSync clientSync = newStandbyClientSync(clientStore, serverPort.getPort(), false);
+        ) {
+            serverSync.start();
+            clientSync.run();
+        }
+
+        assertEquals(serverStore.getHead(), clientStore.getHead());
     }
 
     @Test
     public void testProxySSL() throws Exception {
-        useProxy(true);
+        FileStore storeS = serverFileStore.fileStore();
+        FileStore storeC = clientFileStore1.fileStore();
+
+        NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build();
+        addTestContent(store, "server");
+        storeS.flush();
+
+        try (
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, true);
+            StandbyClientSync clientSync = newStandbyClientSync(storeC, serverPort.getPort(), true);
+        ) {
+            serverSync.start();
+            clientSync.run();
+        }
+
+        assertEquals(storeS.getHead(), storeC.getHead());
     }
 
     @Test
     public void testProxySkippedBytes() throws Exception {
-        useProxy(false, 100, 1, false);
+        useProxy(false, 100, 1, -1, false);
     }
 
     @Test
     public void testProxySSLSkippedBytes() throws Exception {
-        useProxy(true, 400, 10, false);
+        useProxy(true, 400, 10, -1, false);
     }
 
     @Test
     public void testProxySkippedBytesIntermediateChange() throws Exception {
-        useProxy(false, 100, 1, true);
+        useProxy(false, 100, 1, -1, true);
     }
 
     @Test
     public void testProxySSLSkippedBytesIntermediateChange() throws Exception {
-        useProxy(true, 400, 10, true);
+        useProxy(true, 400, 10, -1, true);
     }
 
     @Test
@@ -134,51 +147,37 @@ public class BrokenNetworkIT extends Tes
         useProxy(true, 0, 0, 575, false);
     }
 
-    // private helper
-
-    private void useProxy(boolean ssl) throws Exception {
-        useProxy(ssl, 0, 0, false);
-    }
-
-    private void useProxy(boolean ssl, int skipPosition, int skipBytes, boolean intermediateChange) throws Exception {
-        useProxy(ssl, skipPosition, skipBytes, -1, intermediateChange);
-    }
-
     private void useProxy(boolean ssl, int skipPosition, int skipBytes, int flipPosition, boolean intermediateChange) throws Exception {
-        FileStore storeS = serverFileStore.fileStore();
-        FileStore storeC = clientFileStore1.fileStore();
-        FileStore storeC2 = clientFileStore2.fileStore();
+        FileStore serverStore = serverFileStore.fileStore();
+        FileStore clientStore = clientFileStore1.fileStore();
 
-        NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build();
+        NodeStore store = SegmentNodeStoreBuilders.builder(serverStore).build();
         addTestContent(store, "server");
-        storeS.flush();  // this speeds up the test a little bit...
-
-        try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, 1 * MB, ssl);
-                StandbyClientSync clientSync = newStandbyClientSync(storeC, proxyPort.getPort(), ssl);
-        ) {
-            proxy.skipBytes(skipPosition, skipBytes);
-            proxy.flipByte(flipPosition);
-            proxy.connect();
+        serverStore.flush();
 
+        try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, ssl)) {
             serverSync.start();
 
-            clientSync.run();
-
-            if (skipBytes > 0 || flipPosition >= 0) {
-                assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead()));
-                assertEquals(storeC2.getHead(), storeC.getHead());
-
-                proxy.reset();
-                if (intermediateChange) {
-                    addTestContent(store, "server2");
-                    storeS.flush();
-                }
+            try (
+                NetworkErrorProxy ignored = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort(), flipPosition, skipPosition, skipBytes);
+                StandbyClientSync clientSync = newStandbyClientSync(clientStore, proxyPort.getPort(), ssl)
+            ) {
                 clientSync.run();
             }
 
-            assertEquals(storeS.getHead(), storeC.getHead());
+            assertFalse("stores are equal", serverStore.getHead().equals(clientStore.getHead()));
+
+            if (intermediateChange) {
+                addTestContent(store, "server2");
+                serverStore.flush();
+            }
+
+            try (StandbyClientSync clientSync = newStandbyClientSync(clientStore, serverPort.getPort(), ssl)) {
+                clientSync.run();
+            }
         }
+
+        assertEquals("stores are not equal", serverStore.getHead(), clientStore.getHead());
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java Fri Oct 27 12:19:53 2017
@@ -59,7 +59,7 @@ public abstract class DataStoreTestBase
 
     private static final Logger logger = LoggerFactory.getLogger(DataStoreTestBase.class);
 
-    static final long GB = 1024 * 1024 * 1024;
+    private static final long GB = 1024 * 1024 * 1024;
 
     @Rule
     public TemporaryPort serverPort = new TemporaryPort();
@@ -74,7 +74,7 @@ public abstract class DataStoreTestBase
 
     abstract FileStore getSecondary();
 
-    abstract boolean storesShouldBeEqual();
+    abstract boolean storesShouldBeDifferent();
 
     private InputStream newRandomInputStream(final long size, final int seed) {
         return new InputStream() {
@@ -94,9 +94,9 @@ public abstract class DataStoreTestBase
 
         };
     }
-    
+
     protected byte[] addTestContent(NodeStore store, String child, int size)
-            throws CommitFailedException, IOException {
+        throws CommitFailedException, IOException {
         NodeBuilder builder = store.getRoot().builder();
         builder.child(child).setProperty("ts", System.currentTimeMillis());
 
@@ -109,8 +109,8 @@ public abstract class DataStoreTestBase
         store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
         return data;
     }
-    
-    protected void addTestContentOnTheFly(NodeStore store, String child, long size, int seed) throws CommitFailedException, IOException {
+
+    private void addTestContentOnTheFly(NodeStore store, String child, long size, int seed) throws CommitFailedException, IOException {
         NodeBuilder builder = store.getRoot().builder();
         builder.child(child).setProperty("ts", System.currentTimeMillis());
 
@@ -131,7 +131,7 @@ public abstract class DataStoreTestBase
     public void after() {
         logger.info("Test end: {}", testName.getMethodName());
     }
-    
+
     @Test
     public void testResilientSync() throws Exception {
         final int blobSize = 5 * MB;
@@ -143,8 +143,8 @@ public abstract class DataStoreTestBase
 
         // run 1: unsuccessful
         try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
-                StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+            StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
         ) {
             serverSync.start();
             // no persisted head on primary
@@ -153,11 +153,11 @@ public abstract class DataStoreTestBase
             cl.run();
             assertNotEquals(primary.getHead(), secondary.getHead());
         }
-        
+
         // run 2: successful
         try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
-                StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+            StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
         ) {
             serverSync.start();
             // this time persisted head will be available on primary
@@ -170,14 +170,14 @@ public abstract class DataStoreTestBase
         assertTrue(secondary.getStats().getApproximateSize() < MB);
 
         PropertyState ps = secondary.getHead().getChildNode("root")
-                .getChildNode("server").getProperty("testBlob");
+            .getChildNode("server").getProperty("testBlob");
         assertNotNull(ps);
         assertEquals(Type.BINARY.tag(), ps.getType().tag());
         Blob b = ps.getValue(Type.BINARY);
         assertEquals(blobSize, b.length());
         byte[] testData = new byte[blobSize];
         try (
-                InputStream blobInputStream = b.getNewStream()
+            InputStream blobInputStream = b.getNewStream()
         ) {
             ByteStreams.readFully(blobInputStream, testData);
             assertArrayEquals(data, testData);
@@ -193,8 +193,8 @@ public abstract class DataStoreTestBase
         NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
         byte[] data = addTestContent(store, "server", blobSize);
         try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
-                StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort())
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+            StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort())
         ) {
             serverSync.start();
             primary.flush();
@@ -206,14 +206,14 @@ public abstract class DataStoreTestBase
         assertTrue(secondary.getStats().getApproximateSize() < MB);
 
         PropertyState ps = secondary.getHead().getChildNode("root")
-                .getChildNode("server").getProperty("testBlob");
+            .getChildNode("server").getProperty("testBlob");
         assertNotNull(ps);
         assertEquals(Type.BINARY.tag(), ps.getType().tag());
         Blob b = ps.getValue(Type.BINARY);
         assertEquals(blobSize, b.length());
         byte[] testData = new byte[blobSize];
         try (
-                InputStream blobInputStream = b.getNewStream()
+            InputStream blobInputStream = b.getNewStream()
         ) {
             ByteStreams.readFully(blobInputStream, testData);
             assertArrayEquals(data, testData);
@@ -225,7 +225,7 @@ public abstract class DataStoreTestBase
      */
     @Test
     public void testSyncBigBlob() throws Exception {
-        final long blobSize = (long) (1 * GB);
+        final long blobSize = GB;
         final int seed = 13;
 
         FileStore primary = getPrimary();
@@ -235,8 +235,8 @@ public abstract class DataStoreTestBase
         addTestContentOnTheFly(store, "server", blobSize, seed);
 
         try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 8 * MB);
-                StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 60_000)
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 8 * MB);
+            StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 60_000)
         ) {
             serverSync.start();
             primary.flush();
@@ -248,20 +248,20 @@ public abstract class DataStoreTestBase
         assertTrue(secondary.getStats().getApproximateSize() < MB);
 
         PropertyState ps = secondary.getHead().getChildNode("root")
-                .getChildNode("server").getProperty("testBlob");
+            .getChildNode("server").getProperty("testBlob");
         assertNotNull(ps);
         assertEquals(Type.BINARY.tag(), ps.getType().tag());
         Blob b = ps.getValue(Type.BINARY);
         assertEquals(blobSize, b.length());
 
         try (
-                InputStream randomInputStream = newRandomInputStream(blobSize, seed);
-                InputStream blobInputStream = b.getNewStream()
+            InputStream randomInputStream = newRandomInputStream(blobSize, seed);
+            InputStream blobInputStream = b.getNewStream()
         ) {
             assertTrue(IOUtils.contentEquals(randomInputStream, blobInputStream));
         }
     }
-    
+
     /*
      * See OAK-4969.
      */
@@ -274,8 +274,8 @@ public abstract class DataStoreTestBase
 
         NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
         try (
-                StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
-                StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())
+            StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+            StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())
         ) {
             serverSync.start();
 
@@ -328,50 +328,54 @@ public abstract class DataStoreTestBase
 
     private void useProxy(int skipPosition, int skipBytes, int flipPosition, boolean intermediateChange) throws Exception {
         int blobSize = 5 * MB;
+
         FileStore primary = getPrimary();
         FileStore secondary = getSecondary();
 
         NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
         byte[] data = addTestContent(store, "server", blobSize);
+        primary.flush();
+
         try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB)) {
             serverSync.start();
-            try (NetworkErrorProxy proxy = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort())) {
-                proxy.skipBytes(skipPosition, skipBytes);
-                proxy.flipByte(flipPosition);
-                proxy.connect();
-                try (StandbyClientSync clientSync = newStandbyClientSync(secondary, proxyPort.getPort())) {
-                    primary.flush();
-                    clientSync.run();
-                    if (skipBytes > 0 || flipPosition >= 0) {
-                        if (!storesShouldBeEqual()) {
-                            assertFalse("stores are not expected to be equal", primary.getHead().equals(secondary.getHead()));
-                        }
-                        proxy.reset();
-                        if (intermediateChange) {
-                            blobSize = 2 * MB;
-                            data = addTestContent(store, "server", blobSize);
-                            primary.flush();
-                        }
-                        clientSync.run();
-                    }
-                    assertEquals(primary.getHead(), secondary.getHead());
-                }
+
+            try (
+                NetworkErrorProxy ignored = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort(), flipPosition, skipPosition, skipBytes);
+                StandbyClientSync clientSync = newStandbyClientSync(secondary, proxyPort.getPort())
+            ) {
+                clientSync.run();
+            }
+
+            if (storesShouldBeDifferent()) {
+                assertFalse("stores are equal", primary.getHead().equals(secondary.getHead()));
+            }
+
+            if (intermediateChange) {
+                blobSize = 2 * MB;
+                data = addTestContent(store, "server", blobSize);
+                primary.flush();
+            }
+
+            try (StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())) {
+                clientSync.run();
             }
         }
 
+        assertEquals(primary.getHead(), secondary.getHead());
+
         assertTrue(primary.getStats().getApproximateSize() < MB);
         assertTrue(secondary.getStats().getApproximateSize() < MB);
 
-        PropertyState ps = secondary.getHead().getChildNode("root")
-                .getChildNode("server").getProperty("testBlob");
+        PropertyState ps = secondary.getHead()
+            .getChildNode("root")
+            .getChildNode("server")
+            .getProperty("testBlob");
         assertNotNull(ps);
         assertEquals(Type.BINARY.tag(), ps.getType().tag());
         Blob b = ps.getValue(Type.BINARY);
         assertEquals(blobSize, b.length());
         byte[] testData = new byte[blobSize];
-        try (
-                InputStream blobInputStream = b.getNewStream()
-        ) {
+        try (InputStream blobInputStream = b.getNewStream()) {
             ByteStreams.readFully(blobInputStream, testData);
             assertArrayEquals(data, testData);
         }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java Fri Oct 27 12:19:53 2017
@@ -65,8 +65,8 @@ public class ExternalPrivateStoreIT exte
     }
 
     @Override
-    boolean storesShouldBeEqual() {
-        return false;
+    boolean storesShouldBeDifferent() {
+        return true;
     }
 
     @Test

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java Fri Oct 27 12:19:53 2017
@@ -54,8 +54,8 @@ public class ExternalSharedStoreIT exten
     }
 
     @Override
-    boolean storesShouldBeEqual() {
-        return true;
+    boolean storesShouldBeDifferent() {
+        return false;
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java Fri Oct 27 12:19:53 2017
@@ -44,6 +44,11 @@ class BackwardHandler extends ChannelInb
     }
 
     @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        target.close();
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         log.error("Unexpected error, closing channel", cause);
         target.close();

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java Fri Oct 27 12:19:53 2017
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.Atomi
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
@@ -83,27 +82,13 @@ class ForwardHandler extends ChannelInbo
                     }
 
                 });
-        ChannelFuture f = b.connect(targetHost, targetPort);
-        if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Connected to remote host");
-        } else {
-            throw new Exception("Connection to remote host timed out");
-        }
-        remote = f.channel();
+        remote = b.connect(targetHost, targetPort).sync().channel();
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        if (remote.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Connection to remote host closed");
-        } else {
-            log.debug("Closing connection to remote host timed out");
-        }
-        if (group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Group shut down");
-        } else {
-            log.debug("Shutting down group timed out");
-        }
+        remote.close();
+        group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java Fri Oct 27 12:19:53 2017
@@ -25,121 +25,48 @@ import java.util.concurrent.atomic.Atomi
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class NetworkErrorProxy implements Closeable {
 
-    private static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
-
     private static final AtomicInteger bossThreadNumber = new AtomicInteger(1);
 
     private static final AtomicInteger workerThreadNumber = new AtomicInteger(1);
 
-    private static final int DEFAULT_FLIP_POSITION = -1;
-
-    private static final int DEFAULT_SKIP_POSITION = -1;
-
-    private static final int DEFAULT_SKIP_LENGTH = 0;
-
-    private final int inboundPort;
-
-    private final int outboundPort;
-
-    private final String host;
-
-    private int flipPosition = DEFAULT_FLIP_POSITION;
-
-    private int skipPosition = DEFAULT_SKIP_POSITION;
-
-    private int skipLength = DEFAULT_SKIP_LENGTH;
+    private final Channel server;
 
-    private Channel server;
-
-    private EventLoopGroup boss = new NioEventLoopGroup(0, r -> {
+    private final EventLoopGroup boss = new NioEventLoopGroup(0, r -> {
         return new Thread(r, String.format("proxy-boss-%d", bossThreadNumber.getAndIncrement()));
     });
 
-    private EventLoopGroup worker = new NioEventLoopGroup(0, r -> {
+    private final EventLoopGroup worker = new NioEventLoopGroup(0, r -> {
         return new Thread(r, String.format("proxy-worker-%d", workerThreadNumber.getAndIncrement()));
     });
 
-    public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) {
-        this.inboundPort = inboundPort;
-        this.outboundPort = outboundPort;
-        this.host = outboundHost;
-    }
-
-    public void skipBytes(int pos, int n) {
-        skipPosition = pos;
-        skipLength = n;
-    }
-
-    public void flipByte(int pos) {
-        flipPosition = pos;
-    }
-
-    public void connect() throws Exception {
-        log.info("Starting proxy with flip={}, skip={},{}", flipPosition, skipPosition, skipLength);
+    public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort, int flipPosition, int skipPosition, int skipLength) throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap()
-                .group(boss, worker)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<SocketChannel>() {
-
-                    @Override
-                    public void initChannel(SocketChannel ch) throws Exception {
-                        ch.pipeline().addLast(new ForwardHandler(host, outboundPort, flipPosition, skipPosition, skipLength));
-                    }
-
-                });
-        ChannelFuture f = b.bind(this.inboundPort);
-        if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Bound on port {}", inboundPort);
-        } else {
-            throw new Exception(String.format("Binding on port %d timed out", inboundPort));
-        }
-        server = f.channel();
-    }
+            .group(boss, worker)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ch.pipeline().addLast(new ForwardHandler(outboundHost, outboundPort, flipPosition, skipPosition, skipLength));
+                }
 
-    public void reset() throws Exception {
-        flipPosition = DEFAULT_FLIP_POSITION;
-        skipPosition = DEFAULT_SKIP_POSITION;
-        skipLength = DEFAULT_SKIP_LENGTH;
-        if (server != null) {
-            if (server.disconnect().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-                log.debug("Channel disconnected");
-            } else {
-                throw new Exception("Channel disconnect timed out");
-            }
-        }
-        connect();
+            });
+        server = b.bind(inboundPort).sync().channel();
     }
 
     @Override
     public void close() {
-        if (server != null) {
-            if (server.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-                log.debug("Channel closed");
-            } else {
-                log.debug("Channel close timed out");
-            }
-        }
-        if (boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Boss group shut down");
-        } else {
-            log.debug("Boss group shutdown timed out");
-        }
-        if (worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
-            log.debug("Worker group shut down");
-        } else {
-            log.debug("Worker group shutdown timed out");
-        }
+        server.close();
+        boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
+        worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
     }
 
 }