You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/10/20 15:18:53 UTC
svn commit: r1765831 - in
/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment:
standby/ test/proxy/
Author: frm
Date: Thu Oct 20 15:18:53 2016
New Revision: 1765831
URL: http://svn.apache.org/viewvc?rev=1765831&view=rev
Log:
OAK-4958 - Improve consistency of NetworkErrorProxy
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java
- copied, changed from r1765814, jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java (with props)
Removed:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java?rev=1765831&r1=1765830&r2=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java Thu Oct 20 15:18:53 2016
@@ -30,6 +30,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore;
+import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.Rule;
import org.junit.Test;
@@ -138,7 +139,7 @@ public class BrokenNetworkIT extends Tes
) {
p.skipBytes(skipPosition, skipBytes);
p.flipByte(flipPosition);
- p.run();
+ p.connect();
serverSync.start();
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1765831&r1=1765830&r2=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java Thu Oct 20 15:18:53 2016
@@ -26,28 +26,23 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
-import java.io.File;
import java.io.IOException;
import java.util.Random;
-import java.util.concurrent.ScheduledExecutorService;
import com.google.common.io.ByteStreams;
-import org.apache.jackrabbit.core.data.FileDataStore;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync;
import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync;
+import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.junit.Test;
public abstract class DataStoreTestBase extends TestBase {
@@ -156,7 +151,7 @@ public abstract class DataStoreTestBase
) {
p.skipBytes(skipPosition, skipBytes);
p.flipByte(flipPosition);
- p.run();
+ p.connect();
serverSync.start();
primary.flush();
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BackwardHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger log = LoggerFactory.getLogger(BackwardHandler.class);
+
+ private final Channel target;
+
+ BackwardHandler(Channel target) {
+ this.target = target;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ target.write(msg);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ target.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.error("Unexpected error, closing channel", cause);
+ target.close();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/BackwardHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+class FlipHandler extends ChannelInboundHandlerAdapter {
+
+ private int current;
+
+ private int flip;
+
+ FlipHandler(int pos) {
+ this.flip = pos;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof ByteBuf) {
+ onByteBuf((ByteBuf) msg);
+ }
+ ctx.fireChannelRead(msg);
+ }
+
+ private void onByteBuf(ByteBuf b) {
+ int i = flip - current;
+ if (0 <= i && i < b.readableBytes()) {
+ b.setByte(i, ~b.getByte(i));
+ }
+ current += b.readableBytes();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/FlipHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ForwardHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger log = LoggerFactory.getLogger(ForwardHandler.class);
+
+ private final String targetHost;
+
+ private final int targetPort;
+
+ private final int skipPosition;
+
+ private final int skipBytes;
+
+ private final int flipPosition;
+
+ private Channel remote;
+
+ private EventLoopGroup group;
+
+ ForwardHandler(String host, int port, int flipPosition, int skipPosition, int skipBytes) {
+ this.targetHost = host;
+ this.targetPort = port;
+ this.flipPosition = flipPosition;
+ this.skipPosition = skipPosition;
+ this.skipBytes = skipBytes;
+ }
+
+ @Override
+ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
+ group = new NioEventLoopGroup();
+ Bootstrap b = new Bootstrap()
+ .group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ if (flipPosition >= 0) {
+ ch.pipeline().addLast(new FlipHandler(flipPosition));
+ }
+ if (skipBytes > 0) {
+ ch.pipeline().addLast(new SkipHandler(skipPosition, skipBytes));
+ }
+ ch.pipeline().addLast(new BackwardHandler(ctx.channel()));
+ }
+
+ });
+ ChannelFuture f = b.connect(targetHost, targetPort);
+ if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ log.debug("Connected to remote host");
+ } else {
+ log.debug("Connection to remote host timed out");
+ }
+ remote = f.channel();
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ if (remote.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ log.debug("Connection to remote host closed");
+ } else {
+ log.debug("Closing connection to remote host timed out");
+ }
+ if (group.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ log.debug("Group shut down");
+ } else {
+ log.debug("Shutting down group timed out");
+ }
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ remote.write(msg);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ remote.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("Unexpected error, closing channel", cause);
+ ctx.close();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/ForwardHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java (from r1765814, jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java?p2=jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java&p1=jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java&r1=1765814&r2=1765831&rev=1765831&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/NetworkErrorProxy.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/NetworkErrorProxy.java Thu Oct 20 15:18:53 2016
@@ -17,30 +17,31 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.segment.standby;
+package org.apache.jackrabbit.oak.segment.test.proxy;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
-import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkErrorProxy implements Closeable {
- static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
+ private static final Logger log = LoggerFactory.getLogger(NetworkErrorProxy.class);
+
+ private static final int DEFAULT_FLIP_POSITION = -1;
+
+ private static final int DEFAULT_SKIP_POSITION = -1;
+
+ private static final int DEFAULT_SKIP_LENGTH = 0;
private final int inboundPort;
@@ -48,290 +49,89 @@ public class NetworkErrorProxy implement
private final String host;
- private ChannelFuture f;
+ private int flipPosition = DEFAULT_FLIP_POSITION;
+
+ private int skipPosition = DEFAULT_SKIP_POSITION;
- private ForwardHandler fh;
+ private int skipLength = DEFAULT_SKIP_LENGTH;
- private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+ private Channel server;
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private EventLoopGroup boss = new NioEventLoopGroup();
+
+ private EventLoopGroup worker = new NioEventLoopGroup();
public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) {
this.inboundPort = inboundPort;
this.outboundPort = outboundPort;
this.host = outboundHost;
- this.fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort);
}
public void skipBytes(int pos, int n) {
- this.fh.skipPosition = pos;
- this.fh.skipBytes = n;
+ skipPosition = pos;
+ skipLength = n;
}
public void flipByte(int pos) {
- this.fh.flipPosition = pos;
+ flipPosition = pos;
}
- public void run() throws Exception {
- try {
- ServerBootstrap b = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(fh);
- }
-
- });
-
- f = b.bind(this.inboundPort).sync();
- } catch (Exception e) {
- log.warn(String.format("Unable to start proxy on port %d", inboundPort), e);
+ public void connect() throws Exception {
+ log.info("Starting proxy with flip={}, skip={},{}", flipPosition, skipPosition, skipLength);
+ ServerBootstrap b = new ServerBootstrap()
+ .group(boss, worker)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(new ForwardHandler(host, outboundPort, flipPosition, skipPosition, skipLength));
+ }
+
+ });
+ ChannelFuture f = b.bind(this.inboundPort);
+ if (f.awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ log.debug("Bound on port {}", inboundPort);
+ } else {
+ log.debug("Binding on port {} timed out", inboundPort);
}
+ server = f.channel();
}
public void reset() throws Exception {
- if (f == null) {
- throw new Exception("proxy not started");
- }
-
- if (f.channel().disconnect().awaitUninterruptibly(10, TimeUnit.SECONDS)) {
- log.debug("Channel disconnected");
- } else {
- log.debug("Channel disconnect timed out");
+ flipPosition = DEFAULT_FLIP_POSITION;
+ skipPosition = DEFAULT_SKIP_POSITION;
+ skipLength = DEFAULT_SKIP_LENGTH;
+ if (server != null) {
+ if (server.disconnect().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ log.debug("Channel disconnected");
+ } else {
+ log.debug("Channel disconnect timed out");
+ }
}
-
- fh = new ForwardHandler(NetworkErrorProxy.this.host, NetworkErrorProxy.this.outboundPort);
-
- run();
+ connect();
}
@Override
public void close() {
- if (f != null) {
- if (f.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS)) {
+ if (server != null) {
+ if (server.close().awaitUninterruptibly(1, TimeUnit.SECONDS)) {
log.debug("Channel closed");
} else {
log.debug("Channel close timed out");
}
}
-
- if (bossGroup.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ if (boss.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
log.debug("Boss group shut down");
} else {
log.debug("Boss group shutdown timed out");
}
-
- if (workerGroup.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
+ if (worker.shutdownGracefully(0, 150, TimeUnit.MILLISECONDS).awaitUninterruptibly(1, TimeUnit.SECONDS)) {
log.debug("Worker group shut down");
} else {
log.debug("Worker group shutdown timed out");
}
}
-}
-
-class ForwardHandler extends ChannelInboundHandlerAdapter {
-
- private final String targetHost;
-
- private final int targetPort;
-
- public long transferredBytes;
-
- public int skipPosition;
- public int skipBytes;
-
- public int flipPosition;
-
- private ChannelFuture remote;
-
- public ForwardHandler(String host, int port) {
- this.targetHost = host;
- this.targetPort = port;
- this.flipPosition = -1;
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- final ChannelHandlerContext c = ctx;
- EventLoopGroup group = new NioEventLoopGroup();
- Bootstrap cb = new Bootstrap();
- cb.group(group);
- cb.channel(NioSocketChannel.class);
-
- cb.handler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- SendBackHandler sbh = new SendBackHandler(c);
- if (ForwardHandler.this.flipPosition >= 0) {
- sbh = new BitFlipHandler(c, ForwardHandler.this.flipPosition);
- } else if (ForwardHandler.this.skipBytes > 0) {
- sbh = new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes);
- }
- ch.pipeline().addFirst(sbh);
- }
- });
- remote = cb.connect(this.targetHost, this.targetPort).sync();
-
- ctx.fireChannelRegistered();
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- remote.channel().close();
- remote = null;
- ctx.fireChannelUnregistered();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof ByteBuf) {
- ByteBuf bb = (ByteBuf) msg;
- this.transferredBytes += (bb.writerIndex() - bb.readerIndex());
- }
- remote.channel().write(msg);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- remote.channel().flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- NetworkErrorProxy.log.debug(cause.getMessage(), cause);
- ctx.close();
- }
}
-class SendBackHandler implements ChannelInboundHandler {
-
- private final ChannelHandlerContext target;
-
- public long transferredBytes;
-
- public SendBackHandler(ChannelHandlerContext ctx) {
- this.target = ctx;
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- }
-
- public int messageSize(Object msg) {
- if (msg instanceof ByteBuf) {
- ByteBuf bb = (ByteBuf) msg;
- return (bb.writerIndex() - bb.readerIndex());
- }
- // unknown
- return 0;
- }
-
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- this.transferredBytes += messageSize(msg);
- this.target.write(msg);
- }
-
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- this.target.flush();
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- }
-
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- NetworkErrorProxy.log.debug(cause.getMessage(), cause);
- this.target.close();
- }
-
-}
-
-class SwallowingHandler extends SendBackHandler {
-
- private int skipStartingPos;
-
- private int nrOfBytes;
-
- public SwallowingHandler(ChannelHandlerContext ctx, int skipStartingPos, int numberOfBytes) {
- super(ctx);
- this.skipStartingPos = skipStartingPos;
- this.nrOfBytes = numberOfBytes;
- }
-
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof ByteBuf) {
- ByteBuf bb = (ByteBuf) msg;
- if (this.nrOfBytes > 0) {
- if (this.transferredBytes >= this.skipStartingPos) {
- bb.skipBytes(this.nrOfBytes);
- this.nrOfBytes = 0;
- } else {
- this.skipStartingPos -= messageSize(msg);
- }
- }
- }
- super.channelRead(ctx, msg);
- }
-
-}
-
-class BitFlipHandler extends SendBackHandler {
-
- private static final Logger log = LoggerFactory
- .getLogger(BitFlipHandler.class);
-
- private int startingPos;
-
- public BitFlipHandler(ChannelHandlerContext ctx, int pos) {
- super(ctx);
- this.startingPos = pos;
- }
-
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof ByteBuf) {
- ByteBuf bb = (ByteBuf) msg;
- log.debug("FlipHandler. Got Buffer size: " + bb.readableBytes());
- if (this.startingPos >= 0) {
- if (this.transferredBytes + bb.readableBytes() >= this.startingPos) {
- int i = this.startingPos - (int) this.transferredBytes;
- log.info("FlipHandler flips byte at offset " + (this.transferredBytes + i));
- byte b = (byte) (bb.getByte(i) ^ 0x01);
- bb.setByte(i, b);
- this.startingPos = -1;
- }
- }
- }
- super.channelRead(ctx, msg);
- }
-
-}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java?rev=1765831&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java Thu Oct 20 15:18:53 2016
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.test.proxy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+class SkipHandler extends ChannelInboundHandlerAdapter {
+
+ private int cur;
+
+ private int pos;
+
+ private int len;
+
+ SkipHandler(int pos, int len) {
+ this.pos = pos;
+ this.len = len;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof ByteBuf) {
+ onByteBuf(ctx, (ByteBuf) msg);
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ private void onByteBuf(ChannelHandlerContext ctx, ByteBuf in) {
+ ByteBuf out = Unpooled.buffer();
+ for (int i = 0; i < in.readableBytes(); i++) {
+ if (cur < pos || pos + len <= cur) {
+ out.writeByte(in.getByte(i));
+ }
+ cur++;
+ }
+ if (out.readableBytes() > 0) {
+ ctx.fireChannelRead(out);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/test/proxy/SkipHandler.java
------------------------------------------------------------------------------
svn:eol-style = native