You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2017/10/27 12:19:53 UTC
svn commit: r1813519 - in
/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment:
standby/ test/proxy/
Author: frm
Date: Fri Oct 27 12:19:53 2017
New Revision: 1813519
URL: http://svn.apache.org/viewvc?rev=1813519&view=rev
Log:
OAK-6829 - Make the configuration of NetworkErrorProxy immutable
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java Fri Oct 27 12:19:53 2017
@@ -25,16 +25,14 @@ import static org.junit.Assert.assertFal
import java.io.File;
+import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore;
-import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
@@ -48,15 +46,10 @@ public class BrokenNetworkIT extends Tes
private TemporaryFileStore clientFileStore1 = new TemporaryFileStore(folder, true);
- private TemporaryFileStore clientFileStore2 = new TemporaryFileStore(folder, true);
-
- private NetworkErrorProxy proxy;
-
@Rule
public RuleChain chain = RuleChain.outerRule(folder)
- .around(serverFileStore)
- .around(clientFileStore1)
- .around(clientFileStore2);
+ .around(serverFileStore)
+ .around(clientFileStore1);
@Rule
public TemporaryPort serverPort = new TemporaryPort();
@@ -64,44 +57,64 @@ public class BrokenNetworkIT extends Tes
@Rule
public TemporaryPort proxyPort = new TemporaryPort();
- @Before
- public void beforeClass() {
- proxy = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort());
- }
-
- @After
- public void afterClass() {
- proxy.close();
- }
-
@Test
public void testProxy() throws Exception {
- useProxy(false);
+ FileStore serverStore = serverFileStore.fileStore();
+ FileStore clientStore = clientFileStore1.fileStore();
+
+ NodeStore store = SegmentNodeStoreBuilders.builder(serverStore).build();
+ addTestContent(store, "server");
+ serverStore.flush();
+
+ try (
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, false);
+ StandbyClientSync clientSync = newStandbyClientSync(clientStore, serverPort.getPort(), false);
+ ) {
+ serverSync.start();
+ clientSync.run();
+ }
+
+ assertEquals(serverStore.getHead(), clientStore.getHead());
}
@Test
public void testProxySSL() throws Exception {
- useProxy(true);
+ FileStore storeS = serverFileStore.fileStore();
+ FileStore storeC = clientFileStore1.fileStore();
+
+ NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build();
+ addTestContent(store, "server");
+ storeS.flush();
+
+ try (
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, true);
+ StandbyClientSync clientSync = newStandbyClientSync(storeC, serverPort.getPort(), true);
+ ) {
+ serverSync.start();
+ clientSync.run();
+ }
+
+ assertEquals(storeS.getHead(), storeC.getHead());
}
@Test
public void testProxySkippedBytes() throws Exception {
- useProxy(false, 100, 1, false);
+ useProxy(false, 100, 1, -1, false);
}
@Test
public void testProxySSLSkippedBytes() throws Exception {
- useProxy(true, 400, 10, false);
+ useProxy(true, 400, 10, -1, false);
}
@Test
public void testProxySkippedBytesIntermediateChange() throws Exception {
- useProxy(false, 100, 1, true);
+ useProxy(false, 100, 1, -1, true);
}
@Test
public void testProxySSLSkippedBytesIntermediateChange() throws Exception {
- useProxy(true, 400, 10, true);
+ useProxy(true, 400, 10, -1, true);
}
@Test
@@ -134,51 +147,37 @@ public class BrokenNetworkIT extends Tes
useProxy(true, 0, 0, 575, false);
}
- // private helper
-
- private void useProxy(boolean ssl) throws Exception {
- useProxy(ssl, 0, 0, false);
- }
-
- private void useProxy(boolean ssl, int skipPosition, int skipBytes, boolean intermediateChange) throws Exception {
- useProxy(ssl, skipPosition, skipBytes, -1, intermediateChange);
- }
-
private void useProxy(boolean ssl, int skipPosition, int skipBytes, int flipPosition, boolean intermediateChange) throws Exception {
- FileStore storeS = serverFileStore.fileStore();
- FileStore storeC = clientFileStore1.fileStore();
- FileStore storeC2 = clientFileStore2.fileStore();
+ FileStore serverStore = serverFileStore.fileStore();
+ FileStore clientStore = clientFileStore1.fileStore();
- NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build();
+ NodeStore store = SegmentNodeStoreBuilders.builder(serverStore).build();
addTestContent(store, "server");
- storeS.flush(); // this speeds up the test a little bit...
-
- try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, 1 * MB, ssl);
- StandbyClientSync clientSync = newStandbyClientSync(storeC, proxyPort.getPort(), ssl);
- ) {
- proxy.skipBytes(skipPosition, skipBytes);
- proxy.flipByte(flipPosition);
- proxy.connect();
+ serverStore.flush();
+ try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, ssl)) {
serverSync.start();
- clientSync.run();
-
- if (skipBytes > 0 || flipPosition >= 0) {
- assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead()));
- assertEquals(storeC2.getHead(), storeC.getHead());
-
- proxy.reset();
- if (intermediateChange) {
- addTestContent(store, "server2");
- storeS.flush();
- }
+ try (
+ NetworkErrorProxy ignored = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort(), flipPosition, skipPosition, skipBytes);
+ StandbyClientSync clientSync = newStandbyClientSync(clientStore, proxyPort.getPort(), ssl)
+ ) {
clientSync.run();
}
- assertEquals(storeS.getHead(), storeC.getHead());
+ assertFalse("stores are equal", serverStore.getHead().equals(clientStore.getHead()));
+
+ if (intermediateChange) {
+ addTestContent(store, "server2");
+ serverStore.flush();
+ }
+
+ try (StandbyClientSync clientSync = newStandbyClientSync(clientStore, serverPort.getPort(), ssl)) {
+ clientSync.run();
+ }
}
+
+ assertEquals("stores are not equal", serverStore.getHead(), clientStore.getHead());
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java Fri Oct 27 12:19:53 2017
@@ -59,7 +59,7 @@ public abstract class DataStoreTestBase
private static final Logger logger = LoggerFactory.getLogger(DataStoreTestBase.class);
- static final long GB = 1024 * 1024 * 1024;
+ private static final long GB = 1024 * 1024 * 1024;
@Rule
public TemporaryPort serverPort = new TemporaryPort();
@@ -74,7 +74,7 @@ public abstract class DataStoreTestBase
abstract FileStore getSecondary();
- abstract boolean storesShouldBeEqual();
+ abstract boolean storesShouldBeDifferent();
private InputStream newRandomInputStream(final long size, final int seed) {
return new InputStream() {
@@ -94,9 +94,9 @@ public abstract class DataStoreTestBase
};
}
-
+
protected byte[] addTestContent(NodeStore store, String child, int size)
- throws CommitFailedException, IOException {
+ throws CommitFailedException, IOException {
NodeBuilder builder = store.getRoot().builder();
builder.child(child).setProperty("ts", System.currentTimeMillis());
@@ -109,8 +109,8 @@ public abstract class DataStoreTestBase
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return data;
}
-
- protected void addTestContentOnTheFly(NodeStore store, String child, long size, int seed) throws CommitFailedException, IOException {
+
+ private void addTestContentOnTheFly(NodeStore store, String child, long size, int seed) throws CommitFailedException, IOException {
NodeBuilder builder = store.getRoot().builder();
builder.child(child).setProperty("ts", System.currentTimeMillis());
@@ -131,7 +131,7 @@ public abstract class DataStoreTestBase
public void after() {
logger.info("Test end: {}", testName.getMethodName());
}
-
+
@Test
public void testResilientSync() throws Exception {
final int blobSize = 5 * MB;
@@ -143,8 +143,8 @@ public abstract class DataStoreTestBase
// run 1: unsuccessful
try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
- StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
) {
serverSync.start();
// no persisted head on primary
@@ -153,11 +153,11 @@ public abstract class DataStoreTestBase
cl.run();
assertNotEquals(primary.getHead(), secondary.getHead());
}
-
+
// run 2: successful
try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
- StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000)
) {
serverSync.start();
// this time persisted head will be available on primary
@@ -170,14 +170,14 @@ public abstract class DataStoreTestBase
assertTrue(secondary.getStats().getApproximateSize() < MB);
PropertyState ps = secondary.getHead().getChildNode("root")
- .getChildNode("server").getProperty("testBlob");
+ .getChildNode("server").getProperty("testBlob");
assertNotNull(ps);
assertEquals(Type.BINARY.tag(), ps.getType().tag());
Blob b = ps.getValue(Type.BINARY);
assertEquals(blobSize, b.length());
byte[] testData = new byte[blobSize];
try (
- InputStream blobInputStream = b.getNewStream()
+ InputStream blobInputStream = b.getNewStream()
) {
ByteStreams.readFully(blobInputStream, testData);
assertArrayEquals(data, testData);
@@ -193,8 +193,8 @@ public abstract class DataStoreTestBase
NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
byte[] data = addTestContent(store, "server", blobSize);
try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
- StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort())
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort())
) {
serverSync.start();
primary.flush();
@@ -206,14 +206,14 @@ public abstract class DataStoreTestBase
assertTrue(secondary.getStats().getApproximateSize() < MB);
PropertyState ps = secondary.getHead().getChildNode("root")
- .getChildNode("server").getProperty("testBlob");
+ .getChildNode("server").getProperty("testBlob");
assertNotNull(ps);
assertEquals(Type.BINARY.tag(), ps.getType().tag());
Blob b = ps.getValue(Type.BINARY);
assertEquals(blobSize, b.length());
byte[] testData = new byte[blobSize];
try (
- InputStream blobInputStream = b.getNewStream()
+ InputStream blobInputStream = b.getNewStream()
) {
ByteStreams.readFully(blobInputStream, testData);
assertArrayEquals(data, testData);
@@ -225,7 +225,7 @@ public abstract class DataStoreTestBase
*/
@Test
public void testSyncBigBlob() throws Exception {
- final long blobSize = (long) (1 * GB);
+ final long blobSize = GB;
final int seed = 13;
FileStore primary = getPrimary();
@@ -235,8 +235,8 @@ public abstract class DataStoreTestBase
addTestContentOnTheFly(store, "server", blobSize, seed);
try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 8 * MB);
- StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 60_000)
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 8 * MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 60_000)
) {
serverSync.start();
primary.flush();
@@ -248,20 +248,20 @@ public abstract class DataStoreTestBase
assertTrue(secondary.getStats().getApproximateSize() < MB);
PropertyState ps = secondary.getHead().getChildNode("root")
- .getChildNode("server").getProperty("testBlob");
+ .getChildNode("server").getProperty("testBlob");
assertNotNull(ps);
assertEquals(Type.BINARY.tag(), ps.getType().tag());
Blob b = ps.getValue(Type.BINARY);
assertEquals(blobSize, b.length());
try (
- InputStream randomInputStream = newRandomInputStream(blobSize, seed);
- InputStream blobInputStream = b.getNewStream()
+ InputStream randomInputStream = newRandomInputStream(blobSize, seed);
+ InputStream blobInputStream = b.getNewStream()
) {
assertTrue(IOUtils.contentEquals(randomInputStream, blobInputStream));
}
}
-
+
/*
* See OAK-4969.
*/
@@ -274,8 +274,8 @@ public abstract class DataStoreTestBase
NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
try (
- StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
- StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())
+ StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB);
+ StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())
) {
serverSync.start();
@@ -328,50 +328,54 @@ public abstract class DataStoreTestBase
private void useProxy(int skipPosition, int skipBytes, int flipPosition, boolean intermediateChange) throws Exception {
int blobSize = 5 * MB;
+
FileStore primary = getPrimary();
FileStore secondary = getSecondary();
NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
byte[] data = addTestContent(store, "server", blobSize);
+ primary.flush();
+
try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB)) {
serverSync.start();
- try (NetworkErrorProxy proxy = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort())) {
- proxy.skipBytes(skipPosition, skipBytes);
- proxy.flipByte(flipPosition);
- proxy.connect();
- try (StandbyClientSync clientSync = newStandbyClientSync(secondary, proxyPort.getPort())) {
- primary.flush();
- clientSync.run();
- if (skipBytes > 0 || flipPosition >= 0) {
- if (!storesShouldBeEqual()) {
- assertFalse("stores are not expected to be equal", primary.getHead().equals(secondary.getHead()));
- }
- proxy.reset();
- if (intermediateChange) {
- blobSize = 2 * MB;
- data = addTestContent(store, "server", blobSize);
- primary.flush();
- }
- clientSync.run();
- }
- assertEquals(primary.getHead(), secondary.getHead());
- }
+
+ try (
+ NetworkErrorProxy ignored = new NetworkErrorProxy(proxyPort.getPort(), getServerHost(), serverPort.getPort(), flipPosition, skipPosition, skipBytes);
+ StandbyClientSync clientSync = newStandbyClientSync(secondary, proxyPort.getPort())
+ ) {
+ clientSync.run();
+ }
+
+ if (storesShouldBeDifferent()) {
+ assertFalse("stores are equal", primary.getHead().equals(secondary.getHead()));
+ }
+
+ if (intermediateChange) {
+ blobSize = 2 * MB;
+ data = addTestContent(store, "server", blobSize);
+ primary.flush();
+ }
+
+ try (StandbyClientSync clientSync = newStandbyClientSync(secondary, serverPort.getPort())) {
+ clientSync.run();
}
}
+ assertEquals(primary.getHead(), secondary.getHead());
+
assertTrue(primary.getStats().getApproximateSize() < MB);
assertTrue(secondary.getStats().getApproximateSize() < MB);
- PropertyState ps = secondary.getHead().getChildNode("root")
- .getChildNode("server").getProperty("testBlob");
+ PropertyState ps = secondary.getHead()
+ .getChildNode("root")
+ .getChildNode("server")
+ .getProperty("testBlob");
assertNotNull(ps);
assertEquals(Type.BINARY.tag(), ps.getType().tag());
Blob b = ps.getValue(Type.BINARY);
assertEquals(blobSize, b.length());
byte[] testData = new byte[blobSize];
- try (
- InputStream blobInputStream = b.getNewStream()
- ) {
+ try (InputStream blobInputStream = b.getNewStream()) {
ByteStreams.readFully(blobInputStream, testData);
assertArrayEquals(data, testData);
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java Fri Oct 27 12:19:53 2017
@@ -65,8 +65,8 @@ public class ExternalPrivateStoreIT exte
}
@Override
- boolean storesShouldBeEqual() {
- return false;
+ boolean storesShouldBeDifferent() {
+ return true;
}
@Test
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java Fri Oct 27 12:19:53 2017
@@ -54,8 +54,8 @@ public class ExternalSharedStoreIT exten
}
@Override
- boolean storesShouldBeEqual() {
- return true;
+ boolean storesShouldBeDifferent() {
+ return false;
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java Fri Oct 27 12:19:53 2017
@@ -44,6 +44,11 @@ class BackwardHandler extends ChannelInb
}
@Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ target.close();
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Unexpected error, closing channel", cause);
target.close();
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java Fri Oct 27 12:19:53 2017
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.Atomi
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
@@ -83,27 +82,13 @@ class ForwardHandler extends ChannelInbo
}
});
- ChannelFuture f = b.connect(targetHost, targetPort);
- if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Connected to remote host");
- } else {
- throw new Exception("Connection to remote host timed out");
- }
- remote = f.channel();
+ remote = b.connect(targetHost, targetPort).sync().channel();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- if (remote.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Connection to remote host closed");
- } else {
- log.debug("Closing connection to remote host timed out");
- }
- if (group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Group shut down");
- } else {
- log.debug("Shutting down group timed out");
- }
+ remote.close();
+ group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
}
@Override
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java?rev=1813519&r1=1813518&r2=1813519&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java Fri Oct 27 12:19:53 2017
@@ -25,121 +25,48 @@ import java.util.concurrent.atomic.Atomi
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class NetworkErrorProxy implements Closeable {
- private static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
-
private static final AtomicInteger bossThreadNumber = new AtomicInteger(1);
private static final AtomicInteger workerThreadNumber = new AtomicInteger(1);
- private static final int DEFAULT_FLIP_POSITION = -1;
-
- private static final int DEFAULT_SKIP_POSITION = -1;
-
- private static final int DEFAULT_SKIP_LENGTH = 0;
-
- private final int inboundPort;
-
- private final int outboundPort;
-
- private final String host;
-
- private int flipPosition = DEFAULT_FLIP_POSITION;
-
- private int skipPosition = DEFAULT_SKIP_POSITION;
-
- private int skipLength = DEFAULT_SKIP_LENGTH;
+ private final Channel server;
- private Channel server;
-
- private EventLoopGroup boss = new NioEventLoopGroup(0, r -> {
+ private final EventLoopGroup boss = new NioEventLoopGroup(0, r -> {
return new Thread(r, String.format("proxy-boss-%d", bossThreadNumber.getAndIncrement()));
});
- private EventLoopGroup worker = new NioEventLoopGroup(0, r -> {
+ private final EventLoopGroup worker = new NioEventLoopGroup(0, r -> {
return new Thread(r, String.format("proxy-worker-%d", workerThreadNumber.getAndIncrement()));
});
- public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) {
- this.inboundPort = inboundPort;
- this.outboundPort = outboundPort;
- this.host = outboundHost;
- }
-
- public void skipBytes(int pos, int n) {
- skipPosition = pos;
- skipLength = n;
- }
-
- public void flipByte(int pos) {
- flipPosition = pos;
- }
-
- public void connect() throws Exception {
- log.info("Starting proxy with flip={}, skip={},{}", flipPosition, skipPosition, skipLength);
+ public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort, int flipPosition, int skipPosition, int skipLength) throws InterruptedException {
ServerBootstrap b = new ServerBootstrap()
- .group(boss, worker)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ForwardHandler(host, outboundPort, flipPosition, skipPosition, skipLength));
- }
-
- });
- ChannelFuture f = b.bind(this.inboundPort);
- if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Bound on port {}", inboundPort);
- } else {
- throw new Exception(String.format("Binding on port %d timed out", inboundPort));
- }
- server = f.channel();
- }
+ .group(boss, worker)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(new ForwardHandler(outboundHost, outboundPort, flipPosition, skipPosition, skipLength));
+ }
- public void reset() throws Exception {
- flipPosition = DEFAULT_FLIP_POSITION;
- skipPosition = DEFAULT_SKIP_POSITION;
- skipLength = DEFAULT_SKIP_LENGTH;
- if (server != null) {
- if (server.disconnect().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Channel disconnected");
- } else {
- throw new Exception("Channel disconnect timed out");
- }
- }
- connect();
+ });
+ server = b.bind(inboundPort).sync().channel();
}
@Override
public void close() {
- if (server != null) {
- if (server.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Channel closed");
- } else {
- log.debug("Channel close timed out");
- }
- }
- if (boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Boss group shut down");
- } else {
- log.debug("Boss group shutdown timed out");
- }
- if (worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
- log.debug("Worker group shut down");
- } else {
- log.debug("Worker group shutdown timed out");
- }
+ server.close();
+ boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
+ worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS);
}
}