You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/07/17 17:58:57 UTC
[64/94] [abbrv] hbase git commit: HBASE-18307 Share the same
EventLoopGroup for NettyRpcServer,
NettyRpcClient and AsyncFSWALProvider at RS side
HBASE-18307 Share the same EventLoopGroup for NettyRpcServer, NettyRpcClient and AsyncFSWALProvider at RS side
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35170345
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35170345
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35170345
Branch: refs/heads/HBASE-14070.HLC
Commit: 351703455a091171a1abc90f250f52f0a7a0aaab
Parents: 1ddcc07
Author: zhangduo <zh...@apache.org>
Authored: Mon Jul 10 16:33:37 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jul 10 21:00:44 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 1 -
.../hbase/io/asyncfs/AsyncFSOutputHelper.java | 7 +-
.../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 9 +-
.../FanOutOneBlockAsyncDFSOutputHelper.java | 15 ++--
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 93 ++++++++------------
.../hbase/regionserver/HRegionServer.java | 36 +++++---
.../hbase/regionserver/wal/AsyncFSWAL.java | 9 +-
.../wal/AsyncProtobufLogWriter.java | 8 +-
.../wal/SecureAsyncProtobufLogWriter.java | 5 +-
.../hbase/util/NettyEventLoopGroupConfig.java | 82 +++++++++++++++++
.../hadoop/hbase/wal/AsyncFSWALProvider.java | 27 ++++--
.../hbase/wal/NettyAsyncFSWALConfigHelper.java | 63 +++++++++++++
.../TestFanOutOneBlockAsyncDFSOutput.java | 42 +++++----
.../hbase/io/asyncfs/TestLocalAsyncOutput.java | 8 +-
.../TestSaslFanOutOneBlockAsyncDFSOutput.java | 9 +-
.../hbase/regionserver/wal/TestAsyncFSWAL.java | 9 +-
.../regionserver/wal/TestAsyncProtobufLog.java | 7 +-
.../regionserver/wal/TestAsyncWALReplay.java | 7 +-
18 files changed, 315 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 7d513db..8dd927e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 7fe86be..57613dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.io.IOException;
@@ -54,11 +55,11 @@ public final class AsyncFSOutputHelper {
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
- boolean createParent, short replication, long blockSize, final EventLoop eventLoop)
- throws IOException {
+ boolean createParent, short replication, long blockSize, EventLoop eventLoop,
+ Class<? extends Channel> channelClass) throws IOException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
- overwrite, createParent, replication, blockSize, eventLoop);
+ overwrite, createParent, replication, blockSize, eventLoop, channelClass);
}
final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index c64cdf7..9cc0ae0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import com.google.common.annotations.VisibleForTesting;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@@ -37,6 +39,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
@@ -71,8 +74,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block.
@@ -461,8 +462,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen; remaining > 0;) {
int toWriteDataLen = Math.min(remaining, maxDataLen);
- combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock,
- syncBlock));
+ combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen),
+ nextSubPacketOffsetInBlock, syncBlock));
nextSubPacketOffsetInBlock += toWriteDataLen;
remaining -= toWriteDataLen;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 3eaacc4..d14d4d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -46,7 +46,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
@@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
-import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
@@ -607,7 +605,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
- BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
+ BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
+ Class<? extends Channel> channelClass) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
@@ -633,7 +632,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Promise<Channel> promise = eventLoop.newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
- new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
+ new Bootstrap().group(eventLoop).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
@Override
@@ -672,7 +671,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
- EventLoop eventLoop) throws IOException {
+ EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
DFSClient client = dfs.getClient();
@@ -701,7 +700,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
- PIPELINE_SETUP_CREATE, summer, eventLoop);
+ PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
@@ -741,14 +740,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
- EventLoop eventLoop) throws IOException {
+ EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
- blockSize, eventLoop);
+ blockSize, eventLoop, channelClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 4b06fab..fafc53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -18,20 +18,19 @@
package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.ServerChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
@@ -47,11 +46,12 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.util.JVM;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -69,57 +69,44 @@ public class NettyRpcServer extends RpcServer {
private final Channel serverChannel;
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- public NettyRpcServer(final Server server, final String name,
- final List<BlockingServiceAndInterface> services,
- final InetSocketAddress bindAddress, Configuration conf,
- RpcScheduler scheduler) throws IOException {
+ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
+ InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler)
+ throws IOException {
super(server, name, services, bindAddress, conf, scheduler);
this.bindAddress = bindAddress;
- boolean useEpoll = useEpoll(conf);
- int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count",
- Runtime.getRuntime().availableProcessors() / 4);
- EventLoopGroup bossGroup = null;
- EventLoopGroup workerGroup = null;
- if (useEpoll) {
- bossGroup = new EpollEventLoopGroup(1);
- workerGroup = new EpollEventLoopGroup(workerCount);
+ EventLoopGroup eventLoopGroup;
+ Class<? extends ServerChannel> channelClass;
+ if (server instanceof HRegionServer) {
+ NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
+ eventLoopGroup = config.group();
+ channelClass = config.serverChannelClass();
} else {
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = new NioEventLoopGroup(workerCount);
+ eventLoopGroup = new NioEventLoopGroup(0,
+ new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
+ channelClass = NioServerSocketChannel.class;
}
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workerGroup);
- if (useEpoll) {
- bootstrap.channel(EpollServerSocketChannel.class);
- } else {
- bootstrap.channel(NioServerSocketChannel.class);
- }
- bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
- bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
- bootstrap.childOption(ChannelOption.ALLOCATOR,
- PooledByteBufAllocator.DEFAULT);
- bootstrap.childHandler(new ChannelInitializer<Channel>() {
-
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
- preambleDecoder.setSingleDecode(true);
- pipeline.addLast("preambleDecoder", preambleDecoder);
- pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this));
- pipeline.addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
- pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
- pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
- }
- });
-
+ ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
+ .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
+ .childHandler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
+ preambleDecoder.setSingleDecode(true);
+ pipeline.addLast("preambleDecoder", preambleDecoder);
+ pipeline.addLast("preambleHandler",
+ new NettyRpcServerPreambleHandler(NettyRpcServer.this));
+ pipeline.addLast("frameDecoder",
+ new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
+ pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
+ pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
+ }
+ });
try {
serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
- LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress()
- + ", hbase.netty.rpc.server.worker.count=" + workerCount
- + ", useEpoll=" + useEpoll);
- allChannels.add(serverChannel);
+ LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
@@ -127,14 +114,6 @@ public class NettyRpcServer extends RpcServer {
this.scheduler.init(new RpcSchedulerContext(this));
}
- private static boolean useEpoll(Configuration conf) {
- // Config to enable native transport.
- boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
- true);
- // Use the faster native epoll transport mechanism on linux if enabled
- return epollEnabled && JVM.isLinux() && JVM.isAmd64();
- }
-
@Override
public synchronized void start() {
if (started) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3593ce6..986d6d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler;
@@ -36,8 +40,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -106,7 +110,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -137,7 +141,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.*;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -153,9 +161,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -179,12 +184,14 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
@@ -204,10 +211,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
import sun.misc.Signal;
import sun.misc.SignalHandler;
@@ -526,6 +529,8 @@ public class HRegionServer extends HasThread implements
protected FileSystemUtilizationChore fsUtilizationChore;
+ private final NettyEventLoopGroupConfig eventLoopGroupConfig;
+
/**
* Starts a HRegionServer at the default location.
*/
@@ -541,6 +546,13 @@ public class HRegionServer extends HasThread implements
super("RegionServer"); // thread name
this.fsOk = true;
this.conf = conf;
+ // initialize netty event loop group at the very beginning as we may use it to start rpc server,
+ // rpc client and WAL.
+ this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
+ NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
+ eventLoopGroupConfig.clientChannelClass());
+ NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
+ eventLoopGroupConfig.clientChannelClass());
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
@@ -3740,4 +3752,8 @@ public class HRegionServer extends HasThread implements
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return this.rsSpaceQuotaManager;
}
+
+ public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
+ return eventLoopGroupConfig;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 69ca1c5..997591b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -24,6 +24,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
@@ -144,6 +145,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final EventLoop eventLoop;
+ private final Class<? extends Channel> channelClass;
+
private final Lock consumeLock = new ReentrantLock();
private final Runnable consumer = this::consume;
@@ -192,10 +195,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
- String prefix, String suffix, EventLoop eventLoop)
+ String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop;
+ this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask;
if (eventLoop instanceof SingleThreadEventExecutor) {
@@ -607,7 +611,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
boolean overwrite = false;
for (int retry = 0;; retry++) {
try {
- return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop);
+ return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop,
+ channelClass);
} catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
if (shouldRetryCreate(e)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e1f7b8f..f020d25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.io.IOException;
@@ -54,6 +55,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private final EventLoop eventLoop;
+ private final Class<? extends Channel> channelClass;
+
private AsyncFSOutput output;
private static final class OutputStreamWrapper extends OutputStream
@@ -99,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private OutputStream asyncOutputWrapper;
- public AsyncProtobufLogWriter(EventLoop eventLoop) {
+ public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
this.eventLoop = eventLoop;
+ this.channelClass = channelClass;
}
@Override
@@ -151,7 +155,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
- blockSize, eventLoop);
+ blockSize, eventLoop, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
index 5a54e98..22c8aa8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@@ -32,8 +33,8 @@ public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null;
- public SecureAsyncProtobufLogWriter(EventLoop eventLoop) {
- super(eventLoop);
+ public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
+ super(eventLoop, channelClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
new file mode 100644
index 0000000..30caf72
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Event loop group related config.
+ */
+@InterfaceAudience.Private
+public class NettyEventLoopGroupConfig {
+
+ private final EventLoopGroup group;
+
+ private final Class<? extends ServerChannel> serverChannelClass;
+
+ private final Class<? extends Channel> clientChannelClass;
+
+ private static boolean useEpoll(Configuration conf) {
+ // Config to enable native transport.
+ boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", true);
+ // Use the faster native epoll transport mechanism on linux if enabled
+ return epollEnabled && JVM.isLinux() && JVM.isAmd64();
+ }
+
+ public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
+ boolean useEpoll = useEpoll(conf);
+ int workerCount = conf.getInt("hbase.netty.worker.count", 0);
+ ThreadFactory eventLoopThreadFactory =
+ new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
+ if (useEpoll) {
+ group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory);
+ serverChannelClass = EpollServerSocketChannel.class;
+ clientChannelClass = EpollSocketChannel.class;
+ } else {
+ group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory);
+ serverChannelClass = NioServerSocketChannel.class;
+ clientChannelClass = NioSocketChannel.class;
+ }
+ }
+
+ public EventLoopGroup group() {
+ return group;
+ }
+
+ public Class<? extends ServerChannel> serverChannelClass() {
+ return serverChannelClass;
+ }
+
+ public Class<? extends Channel> clientChannelClass() {
+ return clientChannelClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 786f58a..2efa96d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.wal;
import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
@@ -36,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Pair;
/**
* A WAL provider that use {@link AsyncFSWAL}.
@@ -52,31 +55,43 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
}
- private EventLoopGroup eventLoopGroup = null;
+ private EventLoopGroup eventLoopGroup;
+ private Class<? extends Channel> channelClass;
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
- eventLoopGroup.next());
+ eventLoopGroup.next(), channelClass);
}
@Override
protected void doInit(Configuration conf) throws IOException {
- eventLoopGroup = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("AsyncFSWAL"));
+ Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
+ NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+ if (eventLoopGroupAndChannelClass != null) {
+ eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+ channelClass = eventLoopGroupAndChannelClass.getSecond();
+ } else {
+ eventLoopGroup = new NioEventLoopGroup(1,
+ new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
+ channelClass = NioSocketChannel.class;
+ }
}
/**
* public because of AsyncFSWAL. Should be package-private
*/
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
- boolean overwritable, EventLoop eventLoop) throws IOException {
+ boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass)
+ throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
try {
- AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
+ AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class)
+ .newInstance(eventLoop, channelClass);
writer.init(fs, path, conf, overwritable);
return writer;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
new file mode 100644
index 0000000..273fc37
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper class for passing netty event loop config to {@link AsyncFSWALProvider}.
+ */
+public class NettyAsyncFSWALConfigHelper {
+
+ private static final String EVENT_LOOP_CONFIG = "hbase.wal.async.event-loop.config";
+
+ private static final String CONFIG_NAME = "global-event-loop";
+
+ private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>> EVENT_LOOP_CONFIG_MAP =
+ new HashMap<>();
+
+ /**
+ * Set the EventLoopGroup and channel class for {@code AsyncFSWALProvider}.
+ */
+ public static void setEventLoopConfig(Configuration conf, EventLoopGroup group,
+ Class<? extends Channel> channelClass) {
+ Preconditions.checkNotNull(group, "group is null");
+ Preconditions.checkNotNull(channelClass, "channel class is null");
+ conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
+ EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
+ Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass));
+ }
+
+ static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) {
+ String name = conf.get(EVENT_LOOP_CONFIG);
+ if (StringUtils.isBlank(name)) {
+ return null;
+ }
+ return EVENT_LOOP_CONFIG_MAP.get(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index f59133a..43a279e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -18,14 +18,17 @@
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -64,6 +67,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
private static int READ_TIMEOUT_MS = 2000;
@Rule
@@ -75,6 +80,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup();
+ CHANNEL_CLASS = NioSocketChannel.class;
}
@AfterClass
@@ -91,9 +97,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail.
for (;;) {
try {
- FanOutOneBlockAsyncDFSOutput out =
- FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
- true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next());
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+ new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
+ EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
out.close();
break;
} catch (IOException e) {
@@ -122,8 +128,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(eventLoop, FS, f, out);
}
@@ -131,8 +137,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testMaxByteBufAllocated() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
out.guess(5 * 1024);
assertEquals(8 * 1024, out.guess(5 * 1024));
assertEquals(16 * 1024, out.guess(10 * 1024));
@@ -146,9 +152,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
- final byte[] b = new byte[10];
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+ byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length);
out.flush(false).get();
@@ -179,8 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(eventLoop, FS, f, out);
@@ -195,11 +201,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
- FS.getDefaultBlockSize(), eventLoop);
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
- assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
+ assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
}
}
@@ -220,7 +226,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
- FS.getDefaultBlockSize(), eventLoop);
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with connection error");
} catch (IOException e) {
LOG.info("expected exception caught", e);
@@ -239,8 +245,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 6bd2d3c..4da778e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@@ -31,8 +33,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
-import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
@@ -44,6 +44,8 @@ public class TestLocalAsyncOutput {
private static EventLoopGroup GROUP = new NioEventLoopGroup();
+ private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
+
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
@AfterClass
@@ -57,7 +59,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
- fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
+ fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index e05d869..7e67a90 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -31,9 +31,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File;
import java.io.IOException;
@@ -83,6 +85,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE =
@@ -166,6 +170,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
+ CHANNEL_CLASS = NioSocketChannel.class;
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
@@ -242,8 +247,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = getTestFile();
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
- true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop);
+ FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+ false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index a55df68..9b28975 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.List;
@@ -41,9 +43,12 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
private static EventLoopGroup GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
+ CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestFSWAL.setUpBeforeClass();
}
@@ -58,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
- suffix, GROUP.next());
+ suffix, GROUP.next(), CHANNEL_CLASS);
}
@Override
@@ -67,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
- suffix, GROUP.next()) {
+ suffix, GROUP.next(), CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
index 72fc4b2..a689775 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -42,9 +44,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
private static EventLoopGroup EVENT_LOOP_GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
+ CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestProtobufLog.setUpBeforeClass();
}
@@ -57,7 +62,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override
protected AsyncWriter createWriter(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false,
- EVENT_LOOP_GROUP.next());
+ EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
index e008b37..17f58f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
@@ -40,9 +42,12 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
private static EventLoopGroup GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay"));
+ CHANNEL_CLASS = NioSocketChannel.class;
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
AbstractTestWALReplay.setUpBeforeClass();
@@ -57,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
- HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next());
+ HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS);
}
}