You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by ivmaykov <gi...@git.apache.org> on 2018/10/15 21:05:36 UTC

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

GitHub user ivmaykov opened a pull request:

    https://github.com/apache/zookeeper/pull/669

    ZOOKEEPER-3152: Port ZK netty stack to netty4

    Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #669
    
----
commit 34c3e275d012bd14c243633a638c85f1ca4a36c4
Author: Ilya Maykov <il...@...>
Date:   2018-08-31T23:26:55Z

    port ZK netty stack from netty3 to netty4
    
    Summary:
    Ported the client connection netty stack from netty3 to netty4. This includes both the server side
    (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    Reviewers: nixon, nwolchko, nedelchev
    
    Subscribers:
    
    Differential Revision: https://phabricator.intern.facebook.com/D9646262
    
    Tasks:
    
    Tags:
    
    Blame Revision:

----


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233289109
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +105,102 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return channel != null || connectFuture != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
    +    }
    +
    +    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
    +        } else {
    +            return bootstrap;
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    -                // this lock guarantees that channel won't be assgined after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == null) {
    -                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap()
    +                .group(eventLoopGroup)
    +                .channel(NettyUtils.nioOrEpollSocketChannel())
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    +        bootstrap = configureBootstrapAllocator(bootstrap);
    +        bootstrap.validate();
     
    -                    // we need to wake up on first connect to avoid timeout.
    -                    wakeupCnxn();
    -                    firstConnect.countDown();
    -                    LOG.info("channel is connected: {}", channelFuture.getChannel());
    -                } finally {
    -                    connectLock.unlock();
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    +                    // this lock guarantees that channel won't be assigned after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    --- End diff --
    
    How could `connectFuture` be null?
    `connectFuture.addListener` call would have already thrown NPE in that case.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226685450
  
    --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.common;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    --- End diff --
    
    This is interesting
    
    Netty has already built in support for this kind of stuff.I see that this class is smarter.
    Isn't running test with paranoid leak detection enough?


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @anmolnar does anything else need to be done with this PR before it can be merged?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
GitHub user ivmaykov reopened a pull request:

    https://github.com/apache/zookeeper/pull/669

    ZOOKEEPER-3152: Port ZK netty stack to netty4

    Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #669
    
----
commit 4fb8eb6ebe69a4f9a0852624d652d0893d6ba625
Author: Ilya Maykov <il...@...>
Date:   2018-08-31T23:26:55Z

    port ZK netty stack from netty3 to netty4
    
    Summary:
    Ported the client connection netty stack from netty3 to netty4. This includes both the server side
    (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    Reviewers: nixon, nwolchko, nedelchev
    
    Subscribers:
    
    Differential Revision: https://phabricator.intern.facebook.com/D9646262
    
    Tasks:
    
    Tags:
    
    Blame Revision:

----


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2473/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226839154
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
             p.createBB();
             updateLastSend();
             sentCount++;
    -        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
    --- End diff --
    
    Essentially you will save allocations if you don't need listeners


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    Cleaner Epoll/Nio selection code


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226682998
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +108,95 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return connectFuture != null || channel != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    -                // this lock guarantees that channel won't be assgined after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == null) {
    -                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap();
    +        bootstrap.group(Objects.requireNonNull(eventLoopGroup))
    +                .channel(NioSocketChannel.class)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()))
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true);
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
    +        }
    +        bootstrap.validate();
    +
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    +                    // this lock guarantees that channel won't be assigned after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    +                            LOG.info("connect attempt cancelled");
    +                            // If the connect attempt was cancelled but succeeded
    +                            // anyway, make sure to close the channel, otherwise
    +                            // we may leak a file descriptor.
    +                            channelFuture.channel().close();
    --- End diff --
    
    Can this turn into an NPE? As channel() may return null. 


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226838972
  
    --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.common;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    --- End diff --
    
    Okay, very good


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233664524
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    yep!


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226690548
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future)
         CnxnChannelHandler channelHandler = new CnxnChannelHandler();
     
         NettyServerCnxnFactory() {
    -        bootstrap = new ServerBootstrap(
    -                new NioServerSocketChannelFactory(
    -                        Executors.newCachedThreadPool(),
    -                        Executors.newCachedThreadPool()));
    -        // parent channel
    -        bootstrap.setOption("reuseAddress", true);
    -        // child channels
    -        bootstrap.setOption("child.tcpNoDelay", true);
    -        /* set socket linger to off, so that socket close does not block */
    -        bootstrap.setOption("child.soLinger", -1);
    -        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    -            @Override
    -            public ChannelPipeline getPipeline() throws Exception {
    -                ChannelPipeline p = Channels.pipeline();
    -                if (secure) {
    -                    initSSL(p);
    -                }
    -                p.addLast("servercnxnfactory", channelHandler);
    -
    -                return p;
    -            }
    -        });
             x509Util = new ClientX509Util();
    +
    +        EventLoopGroup bossGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
    --- End diff --
    
    Consider EPoll


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226835980
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
             p.createBB();
             updateLastSend();
             sentCount++;
    -        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
    --- End diff --
    
    See these very interesting slides 
    http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#8.0


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @dain take a look at the changes I made to airlift's test ByteBuf allocator. With these changes, we (sometimes) get leak details printed to stderr if a ByteBuf leaks, before the test crashes. Feel free to incorporate the approach into airlift if you want.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by maoling <gi...@git.apache.org>.
Github user maoling commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    find a [blog](https://blog.twitter.com/engineering/en_us/a/2013/netty-4-at-twitter-reduced-gc-overhead.html) about the twitter's practice of moving netty3 to netty4 for Finagle.it couldn't be better to give a benchmark like this:
    
    > 5 times less frequent GC pauses: 45.5 vs. 9.2 times/min
    > 5 times less garbage production: 207.11 vs 41.81 MiB/s


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233282137
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
                         if (KeeperException.Code.OK !=
                                 authProvider.handleAuthentication(cnxn, null)) {
                             LOG.error("Authentication failed for session 0x{}",
    -                                Long.toHexString(cnxn.sessionId));
    +                                Long.toHexString(cnxn.getSessionId()));
                             cnxn.close();
                             return;
                         }
     
    -                    allChannels.add(future.getChannel());
    +                    final Channel futureChannel = future.getNow();
    --- End diff --
    
    I think `get()` would be enough, but the check is harmful anyway.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @eolivelli thanks so much for the review! See my responses inline.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov closed the pull request at:

    https://github.com/apache/zookeeper/pull/669


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226757190
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    -                }
    +                LOG.trace("Channel inactive caused close {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    -                }
    +                LOG.debug("Closing {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    +                    // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
    +                    // after either of those. Check for null just to be safe ...
    +                    if (cnxn != null) {
    +                        cnxn.processQueuedBuffer();
    +                    }
    +                    ctx.channel().config().setAutoRead(true);
    +                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
    +                    LOG.debug("Received AutoReadEvent.DISABLE");
    +                    ctx.channel().config().setAutoRead(false);
                     }
    -            } catch(Exception ex) {
    -                LOG.error("Unexpected exception in receive", ex);
    -                throw ex;
    +            } finally {
    +                ReferenceCountUtil.release(evt);
                 }
             }
     
    -        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
    -            if (LOG.isDebugEnabled()) {
    -                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
    -                        + cnxn.queuedBuffer);
    -            }
    -
    -            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
    -                LOG.debug("Received ResumeMessageEvent");
    -                if (cnxn.queuedBuffer != null) {
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace("processing queue "
    -                                + Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                    cnxn.receiveMessage(cnxn.queuedBuffer);
    -                    if (!cnxn.queuedBuffer.readable()) {
    -                        LOG.debug("Processed queue - no bytes remaining");
    -                        cnxn.queuedBuffer = null;
    -                    } else {
    -                        LOG.debug("Processed queue - bytes remaining");
    -                    }
    -                } else {
    -                    LOG.debug("queue empty");
    -                }
    -                cnxn.channel.setReadable(true);
    -            } else {
    -                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace(Long.toHexString(cnxn.sessionId)
    -                            + " buf 0x"
    -                            + ChannelBuffers.hexDump(buf));
    -                }
    -                
    -                if (cnxn.throttled) {
    -                    LOG.debug("Received message while throttled");
    -                    // we are throttled, so we need to queue
    -                    if (cnxn.queuedBuffer == null) {
    -                        LOG.debug("allocating queue");
    -                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
    -                    }
    -                    cnxn.queuedBuffer.writeBytes(buf);
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                } else {
    -                    LOG.debug("not throttled");
    -                    if (cnxn.queuedBuffer != null) {
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -                        cnxn.queuedBuffer.writeBytes(buf);
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -
    -                        cnxn.receiveMessage(cnxn.queuedBuffer);
    -                        if (!cnxn.queuedBuffer.readable()) {
    -                            LOG.debug("Processed queue - no bytes remaining");
    -                            cnxn.queuedBuffer = null;
    -                        } else {
    -                            LOG.debug("Processed queue - bytes remaining");
    -                        }
    +        @Override
    +        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    +            try {
    +                LOG.trace("message received called {}", msg);
    --- End diff --
    
    as above.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233279457
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    Do you need to remove `cnxn` from the channel in the mentioned two events?
    Null check wouldn't do any harm though.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226689302
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    --- End diff --
    
     isTraceEnabled is missing here?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226756565
  
    --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.common;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    --- End diff --
    
    Paranoid leak detection will just print details about leaks, but the test will still pass and it takes a human to examine the test's stderr output to see that there was a problem. Using this allocator will make the test fail if there are buffer leaks.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by maoling <gi...@git.apache.org>.
Github user maoling commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    - where can we find a benchmark comparing netty4 vs netty3?I am testing the perfermance
    - saw many use cases facing memory leak when migrate netty3 to netty4 for misunderstading the netty4 new desigin,worry about this.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    Rebase, update localAddress after accepting connection and log it


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226755668
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -184,7 +213,9 @@ void cleanup() {
     
         @Override
         void close() {
    -        channelFactory.releaseExternalResources();
    +        if (!eventLoopGroup.isShuttingDown()) {
    --- End diff --
    
    I'm not sure if calling `shutdownGracefully` more than once is allowed, which is why I added the check. It might not be necessary.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by enixon <gi...@git.apache.org>.
Github user enixon commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @ivmaykov (or anyone with a stack overflow account, really), do you want to tag a pointer to this PR with 'netty' on stack overflow (as per https://netty.io/community.html) and see if any of the project experts want to weigh in on this port?



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2463/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226845944
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    --- End diff --
    
    Do you want a similar check for LOG.debug() calls as well, or only LOG.trace()?


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by normanmaurer <gi...@git.apache.org>.
Github user normanmaurer commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    Also @eolivelli 


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226684209
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
             p.createBB();
             updateLastSend();
             sentCount++;
    -        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
    --- End diff --
    
    What about adding ', channel.voidPromise()' ?


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by normanmaurer <gi...@git.apache.org>.
Github user normanmaurer commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @ivmaykov I would be have to review your code changes once you think these are ready. While I don't know a lot about Zookeeper internals I know a few things about Netty ;) Just ping me please


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2484/



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2482/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233650367
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    I do remove it in both places, by calling `ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null)`. The getAndSet(null) will atomically return the old value of the attribute and set the new value to null.
    
    Now that I think about it, I'm not sure if we need to remove the attribute in `exceptionCaught()` ... we can probably leave it and let `channelInactive()` take care of removing the attribute. Let me know if you want me to make this change, I think it probably doesn't matter too much either way.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226835913
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +108,95 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return connectFuture != null || channel != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    -                // this lock guarantees that channel won't be assgined after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == null) {
    -                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap();
    +        bootstrap.group(Objects.requireNonNull(eventLoopGroup))
    +                .channel(NioSocketChannel.class)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()))
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true);
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
    +        }
    +        bootstrap.validate();
    +
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    +                    // this lock guarantees that channel won't be assigned after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    +                            LOG.info("connect attempt cancelled");
    +                            // If the connect attempt was cancelled but succeeded
    +                            // anyway, make sure to close the channel, otherwise
    +                            // we may leak a file descriptor.
    +                            channelFuture.channel().close();
    --- End diff --
    
    That was my guess too. Let's keep it single. So okay to me


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2443/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233659921
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
                         if (KeeperException.Code.OK !=
                                 authProvider.handleAuthentication(cnxn, null)) {
                             LOG.error("Authentication failed for session 0x{}",
    -                                Long.toHexString(cnxn.sessionId));
    +                                Long.toHexString(cnxn.getSessionId()));
                             cnxn.close();
                             return;
                         }
     
    -                    allChannels.add(future.getChannel());
    +                    final Channel futureChannel = future.getNow();
    --- End diff --
    
    It's fine.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2442/



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2444/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226839114
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    --- End diff --
    
    It is better to add the 'if' because in general you will skip calling the logger method, with all what is comes with it: evaluating expressions for parameters, passing parameters for the method call, and calling the method.
    You will trade a single cheap method call with a potential expense of resources and useless allocations.
    IMHO it is better to have this pattern consistently in the whole code


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226836043
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
                     }
                 }
                 wakeupCnxn();
    +            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
    +            // so we don't need to do it.
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx,
    -                                    ExceptionEvent e) throws Exception {
    -            LOG.warn("Exception caught: {}", e, e.getCause());
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    +            LOG.warn("Exception caught", cause);
                 cleanup();
             }
         }
    +
    +    /**
    +     * Sets the test ByteBufAllocator. This allocator will be used by all
    +     * future instances of this class.
    +     * It is not recommended to use this method outside of testing.
    +     * @param allocator the ByteBufAllocator to use for all netty buffer
    +     *                  allocations.
    +     */
    +    public static void setTestAllocator(ByteBufAllocator allocator) {
    +        TEST_ALLOCATOR.set(allocator);
    --- End diff --
    
    Yes in my case this is a hole because the client code runs in a potentially unsecure JVM when user code can call public methods


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226683922
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -184,7 +213,9 @@ void cleanup() {
     
         @Override
         void close() {
    -        channelFactory.releaseExternalResources();
    +        if (!eventLoopGroup.isShuttingDown()) {
    --- End diff --
    
    Is this really needed?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226757057
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    --- End diff --
    
    LOG.trace() does an isTraceEnabled check internally. If the additional parameters passed to the log method don't do any work (such as converting the contents of a buffer to a hex string), then the enclosing isTraceEnabled check is redundant.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2485/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226756353
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
                     }
                 }
                 wakeupCnxn();
    +            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
    +            // so we don't need to do it.
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx,
    -                                    ExceptionEvent e) throws Exception {
    -            LOG.warn("Exception caught: {}", e, e.getCause());
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    +            LOG.warn("Exception caught", cause);
                 cleanup();
             }
         }
    +
    +    /**
    +     * Sets the test ByteBufAllocator. This allocator will be used by all
    +     * future instances of this class.
    +     * It is not recommended to use this method outside of testing.
    +     * @param allocator the ByteBufAllocator to use for all netty buffer
    +     *                  allocations.
    +     */
    +    public static void setTestAllocator(ByteBufAllocator allocator) {
    +        TEST_ALLOCATOR.set(allocator);
    --- End diff --
    
    Sure, but that would only affect that client and would have no effect on the server. All clients are untrusted by default since they run on a computer you don't own :)
    
    I can look into using a mocking framework instead if you feel strongly about it.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233286512
  
    --- Diff: zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + *
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    +    private static AtomicReference<TestByteBufAllocator> INSTANCE =
    +            new AtomicReference<>(null);
    +
    +    /**
    +     * Get the singleton testing allocator.
    +     * @return the singleton allocator, creating it if one does not exist.
    +     */
    +    public static TestByteBufAllocator getInstance() {
    +        TestByteBufAllocator result = INSTANCE.get();
    +        if (result == null) {
    +            ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel();
    +            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
    +            INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel));
    +            result = INSTANCE.get();
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * Destroys the singleton testing allocator and throws an error if any of the
    +     * buffers allocated by it have been leaked. Attempts to print leak details to
    +     * standard error before throwing, by using netty's built-in leak tracking.
    +     * Note that this might not always work, since it only triggers when a buffer
    +     * is garbage-collected and calling System.gc() does not guarantee that a buffer
    +     * will actually be GC'ed.
    +     *
    +     * This should be called at the end of a unit test's tearDown() method.
    +     */
    +    public static void checkForLeaks() {
    +        TestByteBufAllocator result = INSTANCE.getAndSet(null);
    +        if (result != null) {
    +            result.checkInstanceForLeaks();
    +        }
    +    }
    +
    +    private final List<ByteBuf> trackedBuffers = new ArrayList<>();
    +    private final ResourceLeakDetector.Level oldLevel;
    +
    +    private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
    +    {
    +        super(false);
    +        this.oldLevel = oldLevel;
    +    }
    +
    +    @Override
    +    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newHeapBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newDirectBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeHeapBuffer(maxNumComponents));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeDirectBuffer(maxNumComponents));
    +    }
    +
    +    private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(Objects.requireNonNull(byteBuf));
    +        return byteBuf;
    +    }
    +
    +    private synchronized ByteBuf track(ByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(Objects.requireNonNull(byteBuf));
    +        return byteBuf;
    +    }
    +
    +    private void checkInstanceForLeaks()
    +    {
    +        try {
    +            long referencedBuffersCount = 0;
    +            synchronized (this) {
    +                referencedBuffersCount = trackedBuffers.stream()
    +                        .filter(byteBuf -> byteBuf.refCnt() > 0)
    +                        .count();
    +                // Make tracked buffers eligible for GC
    +                trackedBuffers.clear();
    +            }
    +            // Throw an error if there were any leaked buffers
    +            if (referencedBuffersCount > 0) {
    +                // Trigger a GC. This will hopefully (but not necessarily) print
    +                // details about detected leaks to standard error before the error
    +                // is thrown.
    +                System.gc();
    +                throw new AssertionError("Found a netty ByteBuf leak!");
    --- End diff --
    
    Are you testing Netty or our code for leaks?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226849723
  
    --- Diff: ivy.xml ---
    @@ -59,9 +59,11 @@
         <dependency org="org.apache.yetus" name="audience-annotations"
                     rev="${audience-annotations.version}"/>
     
    -    <dependency org="io.netty" name="netty" conf="default" rev="${netty.version}">
    -      <artifact name="netty" type="jar" conf="default"/>
    -    </dependency>
    +    <dependency org="io.netty" name="netty-common" conf="default" rev="${netty.version}" />
    --- End diff --
    
    I forgot about this.
    I think it is better to use netty-all because it bundles all of the native artifacts.
    In Bookkeeper for instance we have netty -all, but this is common practice in many other projects.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @normanmaurer a review from you would be very much appreciated! I bought your book (Netty in Action) which helped me quite a bit :) This is the version of the code we've been testing on a real cluster, so I think it is pretty ready for review. There may be some minor changes still coming, but nothing significant.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by dain <gi...@git.apache.org>.
Github user dain commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r225332620
  
    --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.common;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    +    private static AtomicReference<TestByteBufAllocator> INSTANCE =
    +            new AtomicReference<>(null);
    +
    +    /**
    +     * Get the singleton testing allocator.
    +     * @return the singleton allocator, creating it if one does not exist.
    +     */
    +    public static TestByteBufAllocator getInstance() {
    +        TestByteBufAllocator result = INSTANCE.get();
    +        if (result == null) {
    +            // Note: the leak detector level never gets reset after this,
    +            // but that's probably ok since this is only used by test code.
    +            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
    +            INSTANCE.compareAndSet(null, new TestByteBufAllocator());
    +            result = INSTANCE.get();
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * Destroys the singleton testing allocator and throws an error if any of the
    +     * buffers allocated by it have been leaked. Attempts to print leak details to
    +     * standard error before throwing, by using netty's built-in leak tracking.
    +     * Note that this might not always work, since it only triggers when a buffer
    +     * is garbage-collected and calling System.gc() does not guarantee that a buffer
    +     * will actually be GC'ed.
    +     *
    +     * This should be called at the end of a unit test's tearDown() method.
    +     */
    +    public static void checkForLeaks() {
    +        TestByteBufAllocator result = INSTANCE.getAndSet(null);
    +        if (result != null) {
    +            result.checkInstanceForLeaks();
    +        }
    +
    +    }
    +
    +    private final List<ByteBuf> trackedBuffers = new ArrayList<>();
    +
    +    public TestByteBufAllocator()
    +    {
    +        super(false);
    +    }
    +
    +    @Override
    +    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newHeapBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newDirectBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeHeapBuffer(maxNumComponents));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeDirectBuffer(maxNumComponents));
    +    }
    +
    +    private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(byteBuf);
    +        return byteBuf;
    +    }
    +
    +    private synchronized ByteBuf track(ByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(byteBuf);
    +        return byteBuf;
    +    }
    +
    +    private void checkInstanceForLeaks()
    +    {
    +        long referencedBuffersCount = 0;
    +        synchronized (this) {
    +            referencedBuffersCount = trackedBuffers.stream()
    +                    .filter(byteBuf -> byteBuf.refCnt() > 0)
    +                    .count();
    +            // Make tracked buffers eligible for GC
    +            trackedBuffers.clear();
    +        }
    +        // Trigger a GC. This will hopefully (but not necessarily) print details
    +        // about detected leaks to standard error before the error is thrown.
    +        System.gc();
    --- End diff --
    
    I think you want this inside of the if statement below, since it is only needed if there is a leak.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @eolivelli changes you requested:
    - use Epoll if available
    - if (LOG.isDebugEnabled()) around complex LOG.debug() statements
    - use netty-all artifact
    
    Haven't looked into voidPromise() yet.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by enixon <gi...@git.apache.org>.
Github user enixon commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @eolivelli , good find with EPoll.  :)
    
    When @ivmaykov first mentioned using EPoll to me as a potential optimization, I recommended leaving it for later so we would do the reviewers a favor and keep the complexity of this pull request relatively low. We do think it's the right implementation to use here, the only question is where/when to make that contribution.



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    Fixed various issues in netty code


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233652712
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +105,102 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return channel != null || connectFuture != null;
    --- End diff --
    
    As the comment above says, the `isConnected()` method is only used in the main loop inside `ClientCnxn$SendThread.run()` to see if a new connection should be initiated. So, this method should return false if a connection attempt is already in progress. This is the case when `connectFuture` is not null. Arguably the method should be called `isConnectedOrConnecting()` but I didn't want to go around refactoring APIs in this diff - can do it if you like.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226756052
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
             p.createBB();
             updateLastSend();
             sentCount++;
    -        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
    --- End diff --
    
    Can you explain what the purpose of that would be? According to the documentation, voidPromise() returns a promise that will never be notified of success or failure.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/zookeeper/pull/669


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @eolivelli use `voidPromise()` to avoid allocations when writing to channel


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226690017
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    -                }
    +                LOG.trace("Channel inactive caused close {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    -                }
    +                LOG.debug("Closing {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    +                    // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
    +                    // after either of those. Check for null just to be safe ...
    +                    if (cnxn != null) {
    +                        cnxn.processQueuedBuffer();
    +                    }
    +                    ctx.channel().config().setAutoRead(true);
    +                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
    +                    LOG.debug("Received AutoReadEvent.DISABLE");
    +                    ctx.channel().config().setAutoRead(false);
                     }
    -            } catch(Exception ex) {
    -                LOG.error("Unexpected exception in receive", ex);
    -                throw ex;
    +            } finally {
    +                ReferenceCountUtil.release(evt);
                 }
             }
     
    -        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
    -            if (LOG.isDebugEnabled()) {
    -                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
    -                        + cnxn.queuedBuffer);
    -            }
    -
    -            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
    -                LOG.debug("Received ResumeMessageEvent");
    -                if (cnxn.queuedBuffer != null) {
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace("processing queue "
    -                                + Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                    cnxn.receiveMessage(cnxn.queuedBuffer);
    -                    if (!cnxn.queuedBuffer.readable()) {
    -                        LOG.debug("Processed queue - no bytes remaining");
    -                        cnxn.queuedBuffer = null;
    -                    } else {
    -                        LOG.debug("Processed queue - bytes remaining");
    -                    }
    -                } else {
    -                    LOG.debug("queue empty");
    -                }
    -                cnxn.channel.setReadable(true);
    -            } else {
    -                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace(Long.toHexString(cnxn.sessionId)
    -                            + " buf 0x"
    -                            + ChannelBuffers.hexDump(buf));
    -                }
    -                
    -                if (cnxn.throttled) {
    -                    LOG.debug("Received message while throttled");
    -                    // we are throttled, so we need to queue
    -                    if (cnxn.queuedBuffer == null) {
    -                        LOG.debug("allocating queue");
    -                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
    -                    }
    -                    cnxn.queuedBuffer.writeBytes(buf);
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                } else {
    -                    LOG.debug("not throttled");
    -                    if (cnxn.queuedBuffer != null) {
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -                        cnxn.queuedBuffer.writeBytes(buf);
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -
    -                        cnxn.receiveMessage(cnxn.queuedBuffer);
    -                        if (!cnxn.queuedBuffer.readable()) {
    -                            LOG.debug("Processed queue - no bytes remaining");
    -                            cnxn.queuedBuffer = null;
    -                        } else {
    -                            LOG.debug("Processed queue - bytes remaining");
    -                        }
    +        @Override
    +        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    +            try {
    +                LOG.trace("message received called {}", msg);
    --- End diff --
    
    isTraceEnabled?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226755285
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -68,18 +70,21 @@
     public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
     
    -    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
    -            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
    -    Channel channel;
    -    CountDownLatch firstConnect;
    -    ChannelFuture connectFuture;
    -    Lock connectLock = new ReentrantLock();
    -    AtomicBoolean disconnected = new AtomicBoolean();
    -    AtomicBoolean needSasl = new AtomicBoolean();
    -    Semaphore waitSasl = new Semaphore(0);
    +    private final EventLoopGroup eventLoopGroup;
    +    private Channel channel;
    +    private CountDownLatch firstConnect;
    +    private ChannelFuture connectFuture;
    +    private final Lock connectLock = new ReentrantLock();
    +    private final AtomicBoolean disconnected = new AtomicBoolean();
    +    private final AtomicBoolean needSasl = new AtomicBoolean();
    +    private final Semaphore waitSasl = new Semaphore(0);
    +
    +    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
    +            new AtomicReference<>(null);
     
         ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
             this.clientConfig = clientConfig;
    +        eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
    --- End diff --
    
    I'd like to do it in a follow-up diff. I was thinking we default to NIO (since it works on all OSes), and have a config option to use Epoll instead.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2588/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226757240
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future)
         CnxnChannelHandler channelHandler = new CnxnChannelHandler();
     
         NettyServerCnxnFactory() {
    -        bootstrap = new ServerBootstrap(
    -                new NioServerSocketChannelFactory(
    -                        Executors.newCachedThreadPool(),
    -                        Executors.newCachedThreadPool()));
    -        // parent channel
    -        bootstrap.setOption("reuseAddress", true);
    -        // child channels
    -        bootstrap.setOption("child.tcpNoDelay", true);
    -        /* set socket linger to off, so that socket close does not block */
    -        bootstrap.setOption("child.soLinger", -1);
    -        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    -            @Override
    -            public ChannelPipeline getPipeline() throws Exception {
    -                ChannelPipeline p = Channels.pipeline();
    -                if (secure) {
    -                    initSSL(p);
    -                }
    -                p.addLast("servercnxnfactory", channelHandler);
    -
    -                return p;
    -            }
    -        });
             x509Util = new ClientX509Util();
    +
    +        EventLoopGroup bossGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
    --- End diff --
    
    See comment above about making epoll optional based on a config option.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226684843
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
                     }
                 }
                 wakeupCnxn();
    +            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
    +            // so we don't need to do it.
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx,
    -                                    ExceptionEvent e) throws Exception {
    -            LOG.warn("Exception caught: {}", e, e.getCause());
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    +            LOG.warn("Exception caught", cause);
                 cleanup();
             }
         }
    +
    +    /**
    +     * Sets the test ByteBufAllocator. This allocator will be used by all
    +     * future instances of this class.
    +     * It is not recommended to use this method outside of testing.
    +     * @param allocator the ByteBufAllocator to use for all netty buffer
    +     *                  allocations.
    +     */
    +    public static void setTestAllocator(ByteBufAllocator allocator) {
    +        TEST_ALLOCATOR.set(allocator);
    --- End diff --
    
    I think this is a security hole. We are in the client, so untrusted code may call this public method.
    We should use mockito/powermock for this stuff.
    Is there are another way?


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2445/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226755419
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +108,95 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return connectFuture != null || channel != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    -                // this lock guarantees that channel won't be assgined after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == null) {
    -                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap();
    +        bootstrap.group(Objects.requireNonNull(eventLoopGroup))
    +                .channel(NioSocketChannel.class)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()))
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true);
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
    +        }
    +        bootstrap.validate();
    +
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    +                    // this lock guarantees that channel won't be assigned after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    +                            LOG.info("connect attempt cancelled");
    +                            // If the connect attempt was cancelled but succeeded
    +                            // anyway, make sure to close the channel, otherwise
    +                            // we may leak a file descriptor.
    +                            channelFuture.channel().close();
    --- End diff --
    
    I don't think so, since this code can only trigger if the connect future is successful. If the future is not successful, the previous if branch will be taken.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov closed the pull request at:

    https://github.com/apache/zookeeper/pull/669


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
GitHub user ivmaykov reopened a pull request:

    https://github.com/apache/zookeeper/pull/669

    ZOOKEEPER-3152: Port ZK netty stack to netty4

    Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #669
    
----
commit 94c4516bea9b46f5428ef29ff51490f9647eaac3
Author: Ilya Maykov <il...@...>
Date:   2018-08-31T23:26:55Z

    port ZK netty stack from netty3 to netty4
    
    Summary:
    Ported the client connection netty stack from netty3 to netty4. This includes both the server side
    (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
    
    Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
    
    Reviewers: nixon, nwolchko, nedelchev
    
    Subscribers:
    
    Differential Revision: https://phabricator.intern.facebook.com/D9646262
    
    Tasks:
    
    Tags:
    
    Blame Revision:

----


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2603/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233649908
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    --- End diff --
    
    We call `cnxn.close()` at the end of `exceptionCaught()`, which will end up closing the channel so `channelInactive()` will get called, so I think it would be redundant to remove from `allChannels` here.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @maoling it will be hard to do perf comparison of netty3 vs netty4 for us because we are currently using the NIO transports, and we don't plan on switching to netty3 in production. Comparing NIO vs netty4 is kind of apples-and-oranges. A quick perf test with this patch showed that NIO to netty4 did not result in any obvious performance regressions, so I think we can say they are at least comparable :)


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2483/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233278670
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    --- End diff --
    
    You remove `ctx.channel()` from `allChannels` in the Inactive method. Which was actually not the case in the original impl, but I think it makes perfect sense.
    
    Don't you wanna do the same in here?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233663197
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    That's why I'm saying keep the removing of the connection attribute in both methods. :)



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2480/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226756739
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java ---
    @@ -200,24 +186,13 @@ public void setSessionId(long sessionId) {
             this.sessionId = sessionId;
         }
     
    -    @Override
    -    public void enableRecv() {
    -        if (throttled) {
    -            throttled = false;
    -            if (LOG.isDebugEnabled()) {
    -                LOG.debug("Sending unthrottle event " + this);
    -            }
    -            channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
    -        }
    -    }
    -
         @Override
         public void sendBuffer(ByteBuffer sendBuffer) {
             if (sendBuffer == ServerCnxnFactory.closeConn) {
                 close();
                 return;
             }
    -        channel.write(wrappedBuffer(sendBuffer));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer));
    --- End diff --
    
    As above, I'm not sure what that provides. I am still learning about netty so please excuse my ignorance :)


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2439/



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2496/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233651730
  
    --- Diff: zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.buffer.PooledByteBufAllocator;
    +import io.netty.util.ResourceLeakDetector;
    +
    +/**
    + * This is a custom ByteBufAllocator that tracks outstanding allocations and
    + * crashes the program if any of them are leaked.
    + *
    + * Never use this class in production, it will cause your server to run out
    + * of memory! This is because it holds strong references to all allocated
    + * buffers and doesn't release them until checkForLeaks() is called at the
    + * end of a unit test.
    + *
    + * Note: the original code was copied from https://github.com/airlift/drift,
    + * with the permission and encouragement of airlift's author (dain). Airlift
    + * uses the same apache 2.0 license as Zookeeper so this should be ok.
    + *
    + * However, the code was modified to take advantage of Netty's built-in
    + * leak tracking and make a best effort to print details about buffer leaks.
    + *
    + */
    +public class TestByteBufAllocator extends PooledByteBufAllocator {
    +    private static AtomicReference<TestByteBufAllocator> INSTANCE =
    +            new AtomicReference<>(null);
    +
    +    /**
    +     * Get the singleton testing allocator.
    +     * @return the singleton allocator, creating it if one does not exist.
    +     */
    +    public static TestByteBufAllocator getInstance() {
    +        TestByteBufAllocator result = INSTANCE.get();
    +        if (result == null) {
    +            ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel();
    +            ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
    +            INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel));
    +            result = INSTANCE.get();
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * Destroys the singleton testing allocator and throws an error if any of the
    +     * buffers allocated by it have been leaked. Attempts to print leak details to
    +     * standard error before throwing, by using netty's built-in leak tracking.
    +     * Note that this might not always work, since it only triggers when a buffer
    +     * is garbage-collected and calling System.gc() does not guarantee that a buffer
    +     * will actually be GC'ed.
    +     *
    +     * This should be called at the end of a unit test's tearDown() method.
    +     */
    +    public static void checkForLeaks() {
    +        TestByteBufAllocator result = INSTANCE.getAndSet(null);
    +        if (result != null) {
    +            result.checkInstanceForLeaks();
    +        }
    +    }
    +
    +    private final List<ByteBuf> trackedBuffers = new ArrayList<>();
    +    private final ResourceLeakDetector.Level oldLevel;
    +
    +    private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
    +    {
    +        super(false);
    +        this.oldLevel = oldLevel;
    +    }
    +
    +    @Override
    +    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newHeapBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
    +    {
    +        return track(super.newDirectBuffer(initialCapacity, maxCapacity));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeHeapBuffer(maxNumComponents));
    +    }
    +
    +    @Override
    +    public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
    +    {
    +        return track(super.compositeDirectBuffer(maxNumComponents));
    +    }
    +
    +    private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(Objects.requireNonNull(byteBuf));
    +        return byteBuf;
    +    }
    +
    +    private synchronized ByteBuf track(ByteBuf byteBuf)
    +    {
    +        trackedBuffers.add(Objects.requireNonNull(byteBuf));
    +        return byteBuf;
    +    }
    +
    +    private void checkInstanceForLeaks()
    +    {
    +        try {
    +            long referencedBuffersCount = 0;
    +            synchronized (this) {
    +                referencedBuffersCount = trackedBuffers.stream()
    +                        .filter(byteBuf -> byteBuf.refCnt() > 0)
    +                        .count();
    +                // Make tracked buffers eligible for GC
    +                trackedBuffers.clear();
    +            }
    +            // Throw an error if there were any leaked buffers
    +            if (referencedBuffersCount > 0) {
    +                // Trigger a GC. This will hopefully (but not necessarily) print
    +                // details about detected leaks to standard error before the error
    +                // is thrown.
    +                System.gc();
    +                throw new AssertionError("Found a netty ByteBuf leak!");
    --- End diff --
    
    Testing our code for misuse of netty 4 library. If we forget to call `ReferenceCountUtil.release(obj)` on a ref-counted object that we are responsible for releasing, this code will trigger. Forgetting to do this in production would leak memory and eventually OOM the process, so it's a good thing to test for.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226687650
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java ---
    @@ -200,24 +186,13 @@ public void setSessionId(long sessionId) {
             this.sessionId = sessionId;
         }
     
    -    @Override
    -    public void enableRecv() {
    -        if (throttled) {
    -            throttled = false;
    -            if (LOG.isDebugEnabled()) {
    -                LOG.debug("Sending unthrottle event " + this);
    -            }
    -            channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
    -        }
    -    }
    -
         @Override
         public void sendBuffer(ByteBuffer sendBuffer) {
             if (sendBuffer == ServerCnxnFactory.closeConn) {
                 close();
                 return;
             }
    -        channel.write(wrappedBuffer(sendBuffer));
    +        channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer));
    --- End diff --
    
    Consider using voidPromise()


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233659698
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    If `cnxn.close();` triggers channelInactive event too, you need to remove it, otherwise `close()` will be called twice.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233288683
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +105,102 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return channel != null || connectFuture != null;
    --- End diff --
    
    Why would you like to check `connectFuture` too?


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233651445
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
                         if (KeeperException.Code.OK !=
                                 authProvider.handleAuthentication(cnxn, null)) {
                             LOG.error("Authentication failed for session 0x{}",
    -                                Long.toHexString(cnxn.sessionId));
    +                                Long.toHexString(cnxn.getSessionId()));
                             cnxn.close();
                             return;
                         }
     
    -                    allChannels.add(future.getChannel());
    +                    final Channel futureChannel = future.getNow();
    --- End diff --
    
    I think they are equivalent here since we know the future is completed (we are inside a `if (future.isSuccess())` block). I can change it to `get()` if you prefer.


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226835891
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -68,18 +70,21 @@
     public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
     
    -    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
    -            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
    -    Channel channel;
    -    CountDownLatch firstConnect;
    -    ChannelFuture connectFuture;
    -    Lock connectLock = new ReentrantLock();
    -    AtomicBoolean disconnected = new AtomicBoolean();
    -    AtomicBoolean needSasl = new AtomicBoolean();
    -    Semaphore waitSasl = new Semaphore(0);
    +    private final EventLoopGroup eventLoopGroup;
    +    private Channel channel;
    +    private CountDownLatch firstConnect;
    +    private ChannelFuture connectFuture;
    +    private final Lock connectLock = new ReentrantLock();
    +    private final AtomicBoolean disconnected = new AtomicBoolean();
    +    private final AtomicBoolean needSasl = new AtomicBoolean();
    +    private final Semaphore waitSasl = new Semaphore(0);
    +
    +    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
    +            new AtomicReference<>(null);
     
         ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
             this.clientConfig = clientConfig;
    +        eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
    --- End diff --
    
    In almost of the project I know using Netty you are trying to use EPoll ig available and then fallback to Nio.
    I will be happy to create a JIRA and send a diff once we get this merged


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @ivmaykov I answered to all of your comments.
    I expect this to be the very first step.
    
    Coming with Netty 4 it is more easier to reduce memory allocations, expecially on the hot paths. I expect in the future that we will introduce a lot of tricks and techniques to leverage all the potential of Netty


---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233661987
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    It will not be called twice, since removing the connection attribute means the second time we will get null and there is a null check in both places.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    Merged to master branch. Thanks @ivmaykov !


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    @maoling there is no generic benchmarks.
    
    For instance in Bookkeeper we switched to Netty 4 and now we are able to leverage all the cool stuff about memory and so we are able to be more efficient.
    
    Overall by default Netty4 will prefer using off heap memory, this is about being able to do as few as possible memory copies while passing data to the SO.
    
    The simple switch may or may not make overall performance better or even worse. There will be knobs to tune.
    
    Netty 3 is almost obsolete and IMHO it is better to stay on the latest and greatest. Netty4 project is very active.
    
    Another topic will be about refcounting, Netty4 enables even Java programs to work easy with direct memory and with heap memory and provides very efficient ways to pool memory and java objects (see the Recycler facility).
    
    So I think this is only the beginning of this journey



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2528/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226681741
  
    --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -68,18 +70,21 @@
     public class ClientCnxnSocketNetty extends ClientCnxnSocket {
         private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
     
    -    ChannelFactory channelFactory = new NioClientSocketChannelFactory(
    -            Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
    -    Channel channel;
    -    CountDownLatch firstConnect;
    -    ChannelFuture connectFuture;
    -    Lock connectLock = new ReentrantLock();
    -    AtomicBoolean disconnected = new AtomicBoolean();
    -    AtomicBoolean needSasl = new AtomicBoolean();
    -    Semaphore waitSasl = new Semaphore(0);
    +    private final EventLoopGroup eventLoopGroup;
    +    private Channel channel;
    +    private CountDownLatch firstConnect;
    +    private ChannelFuture connectFuture;
    +    private final Lock connectLock = new ReentrantLock();
    +    private final AtomicBoolean disconnected = new AtomicBoolean();
    +    private final AtomicBoolean needSasl = new AtomicBoolean();
    +    private final Semaphore waitSasl = new Semaphore(0);
    +
    +    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
    +            new AtomicReference<>(null);
     
         ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
             this.clientConfig = clientConfig;
    +        eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
    --- End diff --
    
    Let's move to Epoll.
    It can be a followup change (I can send of you don't have already it on your stack of changes)


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2479/



---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2438/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233653584
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---
    @@ -103,71 +105,102 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return channel != null || connectFuture != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
    +    }
    +
    +    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
    +        } else {
    +            return bootstrap;
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    -                // this lock guarantees that channel won't be assgined after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == null) {
    -                        LOG.info("future isn't success, cause: {}", channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap()
    +                .group(eventLoopGroup)
    +                .channel(NettyUtils.nioOrEpollSocketChannel())
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    +        bootstrap = configureBootstrapAllocator(bootstrap);
    +        bootstrap.validate();
     
    -                    // we need to wake up on first connect to avoid timeout.
    -                    wakeupCnxn();
    -                    firstConnect.countDown();
    -                    LOG.info("channel is connected: {}", channelFuture.getChannel());
    -                } finally {
    -                    connectLock.unlock();
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    +                    // this lock guarantees that channel won't be assigned after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    --- End diff --
    
    As the comment below says, there could be a race if the connect attempt was cancelled right around the time the listener callback fired. `cleanup()` below will cancel an in-progress connection attempt and clear the `connectFuture` variable. If this happens, `connectFuture` will be null here.


---

[GitHub] zookeeper issue #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/669
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2441/



---

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

Posted by ivmaykov <gi...@git.apache.org>.
Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233663529
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---
    @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    +        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                 if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    +                LOG.trace("Channel inactive {}", ctx.channel());
                 }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    +                    LOG.trace("Channel inactive caused close {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
                     if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    +                    LOG.debug("Closing {}", cnxn);
                     }
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
    --- End diff --
    
    Ah, right. I think I misunderstood. So the code is good as-is, yes?


---