You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by rg...@apache.org on 2016/06/08 15:43:15 UTC

svn commit: r1747408 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Author: rgs
Date: Wed Jun  8 15:43:15 2016
New Revision: 1747408

URL: http://svn.apache.org/viewvc?rev=1747408&view=rev
Log:
ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
(Michael Han via rgs)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1747408&r1=1747407&r2=1747408&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Jun  8 15:43:15 2016
@@ -308,6 +308,9 @@ BUGFIXES:
   ZOOKEEPER-2405: getTGT() in Login.java mishandles confidential
   information (Michael Han via phunt)
 
+  ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
+  (Michael Han via rgs)
+
 IMPROVEMENTS:
   ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1747408&r1=1747407&r2=1747408&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Jun  8 15:43:15 2016
@@ -237,9 +237,7 @@ public class QuorumCnxManager {
      * @param sid
      */
     public void testInitiateConnection(long sid) throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Opening channel to server " + sid);
-        }
+        LOG.debug("Opening channel to server " + sid);
         Socket sock = new Socket();
         setSockOpts(sock);
         sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
@@ -434,17 +432,14 @@ public class QuorumCnxManager {
             LOG.debug("There is a connection already for server " + sid);
             return true;
         }
-        try {
 
-             if (LOG.isDebugEnabled()) {
-                 LOG.debug("Opening channel to server " + sid);
-             }
-             Socket sock = new Socket();
+        Socket sock = null;
+        try {
+             LOG.debug("Opening channel to server " + sid);
+             sock = new Socket();
              setSockOpts(sock);
              sock.connect(electionAddr, cnxTO);
-             if (LOG.isDebugEnabled()) {
-                 LOG.debug("Connected to server " + sid);
-             }
+             LOG.debug("Connected to server " + sid);
              initiateConnection(sock, sid);
              return true;
          } catch (UnresolvedAddressException e) {
@@ -454,11 +449,13 @@ public class QuorumCnxManager {
              // detail.
              LOG.warn("Cannot open channel to " + sid
                      + " at election address " + electionAddr, e);
+             closeSocket(sock);
              throw e;
          } catch (IOException e) {
              LOG.warn("Cannot open channel to " + sid
                      + " at election address " + electionAddr,
                      e);
+             closeSocket(sock);
              return false;
          }
    
@@ -574,6 +571,10 @@ public class QuorumCnxManager {
      *            Reference to socket
      */
     private void closeSocket(Socket sock) {
+        if (sock == null) {
+            return;
+        }
+
         try {
             sock.close();
         } catch (IOException ie) {
@@ -614,7 +615,7 @@ public class QuorumCnxManager {
         public void run() {
             int numRetries = 0;
             InetSocketAddress addr;
-
+            Socket client = null;
             while((!shutdown) && (numRetries < 3)){
                 try {
                     ss = new ServerSocket();
@@ -632,7 +633,7 @@ public class QuorumCnxManager {
                     setName(addr.toString());
                     ss.bind(addr);
                     while (!shutdown) {
-                        Socket client = ss.accept();
+                        client = ss.accept();
                         setSockOpts(client);
                         LOG.info("Received connection request "
                                 + client.getRemoteSocketAddress());
@@ -654,6 +655,7 @@ public class QuorumCnxManager {
                         LOG.error("Interrupted while sleeping. " +
                             "Ignoring exception", ie);
                     }
+                    closeSocket(client);
                 }
             }
             LOG.info("Leaving listener");
@@ -739,9 +741,7 @@ public class QuorumCnxManager {
         }
                 
         synchronized boolean finish() {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Calling finish for " + sid);
-            }
+            LOG.debug("Calling finish for " + sid);
             
             if(!running){
                 /*
@@ -752,16 +752,14 @@ public class QuorumCnxManager {
             
             running = false;
             closeSocket(sock);
-            // channel = null;
 
             this.interrupt();
             if (recvWorker != null) {
                 recvWorker.finish();
             }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
-            }
+            LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
+
             senderWorkerMap.remove(sid, this);
             threadCnt.decrementAndGet();
             return running;
@@ -919,9 +917,7 @@ public class QuorumCnxManager {
             } finally {
                 LOG.warn("Interrupting SendWorker");
                 sw.finish();
-                if (sock != null) {
-                    closeSocket(sock);
-                }
+                closeSocket(sock);
             }
         }
     }



Re: svn commit: r1747408 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Posted by Raúl Gutiérrez Segalés <rg...@apache.org>.
Hey Pat,

On 14 June 2016 at 21:18, Patrick Hunt <ph...@apache.org> wrote:

> Raul, what's the status on this? Seems like trunk was committed but not
> 3.5? We don't want to lose track.
>

Yup - I was waiting on Chris before pushing to 3.5.

@Chris: may I push?


-rgs



>
> Patrick
>
> On Wed, Jun 8, 2016 at 8:43 AM, <rg...@apache.org> wrote:
>
>> Author: rgs
>> Date: Wed Jun  8 15:43:15 2016
>> New Revision: 1747408
>>
>> URL: http://svn.apache.org/viewvc?rev=1747408&view=rev
>> Log:
>> ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
>> (Michael Han via rgs)
>>
>> Modified:
>>     zookeeper/trunk/CHANGES.txt
>>
>> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
>>
>> Modified: zookeeper/trunk/CHANGES.txt
>> URL:
>> http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1747408&r1=1747407&r2=1747408&view=diff
>>
>> ==============================================================================
>> --- zookeeper/trunk/CHANGES.txt (original)
>> +++ zookeeper/trunk/CHANGES.txt Wed Jun  8 15:43:15 2016
>> @@ -308,6 +308,9 @@ BUGFIXES:
>>    ZOOKEEPER-2405: getTGT() in Login.java mishandles confidential
>>    information (Michael Han via phunt)
>>
>> +  ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
>> +  (Michael Han via rgs)
>> +
>>  IMPROVEMENTS:
>>    ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir
>> Lev-Ari via shralex)
>>
>>
>> Modified:
>> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
>> URL:
>> http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1747408&r1=1747407&r2=1747408&view=diff
>>
>> ==============================================================================
>> ---
>> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
>> (original)
>> +++
>> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
>> Wed Jun  8 15:43:15 2016
>> @@ -237,9 +237,7 @@ public class QuorumCnxManager {
>>       * @param sid
>>       */
>>      public void testInitiateConnection(long sid) throws Exception {
>> -        if (LOG.isDebugEnabled()) {
>> -            LOG.debug("Opening channel to server " + sid);
>> -        }
>> +        LOG.debug("Opening channel to server " + sid);
>>          Socket sock = new Socket();
>>          setSockOpts(sock);
>>          sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
>> @@ -434,17 +432,14 @@ public class QuorumCnxManager {
>>              LOG.debug("There is a connection already for server " + sid);
>>              return true;
>>          }
>> -        try {
>>
>> -             if (LOG.isDebugEnabled()) {
>> -                 LOG.debug("Opening channel to server " + sid);
>> -             }
>> -             Socket sock = new Socket();
>> +        Socket sock = null;
>> +        try {
>> +             LOG.debug("Opening channel to server " + sid);
>> +             sock = new Socket();
>>               setSockOpts(sock);
>>               sock.connect(electionAddr, cnxTO);
>> -             if (LOG.isDebugEnabled()) {
>> -                 LOG.debug("Connected to server " + sid);
>> -             }
>> +             LOG.debug("Connected to server " + sid);
>>               initiateConnection(sock, sid);
>>               return true;
>>           } catch (UnresolvedAddressException e) {
>> @@ -454,11 +449,13 @@ public class QuorumCnxManager {
>>               // detail.
>>               LOG.warn("Cannot open channel to " + sid
>>                       + " at election address " + electionAddr, e);
>> +             closeSocket(sock);
>>               throw e;
>>           } catch (IOException e) {
>>               LOG.warn("Cannot open channel to " + sid
>>                       + " at election address " + electionAddr,
>>                       e);
>> +             closeSocket(sock);
>>               return false;
>>           }
>>
>> @@ -574,6 +571,10 @@ public class QuorumCnxManager {
>>       *            Reference to socket
>>       */
>>      private void closeSocket(Socket sock) {
>> +        if (sock == null) {
>> +            return;
>> +        }
>> +
>>          try {
>>              sock.close();
>>          } catch (IOException ie) {
>> @@ -614,7 +615,7 @@ public class QuorumCnxManager {
>>          public void run() {
>>              int numRetries = 0;
>>              InetSocketAddress addr;
>> -
>> +            Socket client = null;
>>              while((!shutdown) && (numRetries < 3)){
>>                  try {
>>                      ss = new ServerSocket();
>> @@ -632,7 +633,7 @@ public class QuorumCnxManager {
>>                      setName(addr.toString());
>>                      ss.bind(addr);
>>                      while (!shutdown) {
>> -                        Socket client = ss.accept();
>> +                        client = ss.accept();
>>                          setSockOpts(client);
>>                          LOG.info("Received connection request "
>>                                  + client.getRemoteSocketAddress());
>> @@ -654,6 +655,7 @@ public class QuorumCnxManager {
>>                          LOG.error("Interrupted while sleeping. " +
>>                              "Ignoring exception", ie);
>>                      }
>> +                    closeSocket(client);
>>                  }
>>              }
>>              LOG.info("Leaving listener");
>> @@ -739,9 +741,7 @@ public class QuorumCnxManager {
>>          }
>>
>>          synchronized boolean finish() {
>> -            if (LOG.isDebugEnabled()) {
>> -                LOG.debug("Calling finish for " + sid);
>> -            }
>> +            LOG.debug("Calling finish for " + sid);
>>
>>              if(!running){
>>                  /*
>> @@ -752,16 +752,14 @@ public class QuorumCnxManager {
>>
>>              running = false;
>>              closeSocket(sock);
>> -            // channel = null;
>>
>>              this.interrupt();
>>              if (recvWorker != null) {
>>                  recvWorker.finish();
>>              }
>>
>> -            if (LOG.isDebugEnabled()) {
>> -                LOG.debug("Removing entry from senderWorkerMap sid=" +
>> sid);
>> -            }
>> +            LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
>> +
>>              senderWorkerMap.remove(sid, this);
>>              threadCnt.decrementAndGet();
>>              return running;
>> @@ -919,9 +917,7 @@ public class QuorumCnxManager {
>>              } finally {
>>                  LOG.warn("Interrupting SendWorker");
>>                  sw.finish();
>> -                if (sock != null) {
>> -                    closeSocket(sock);
>> -                }
>> +                closeSocket(sock);
>>              }
>>          }
>>      }
>>
>>
>>
>

Re: svn commit: r1747408 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Posted by Patrick Hunt <ph...@apache.org>.
Raul, what's the status on this? Seems like trunk was committed but not
3.5? We don't want to lose track.

Patrick

On Wed, Jun 8, 2016 at 8:43 AM, <rg...@apache.org> wrote:

> Author: rgs
> Date: Wed Jun  8 15:43:15 2016
> New Revision: 1747408
>
> URL: http://svn.apache.org/viewvc?rev=1747408&view=rev
> Log:
> ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
> (Michael Han via rgs)
>
> Modified:
>     zookeeper/trunk/CHANGES.txt
>
> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
>
> Modified: zookeeper/trunk/CHANGES.txt
> URL:
> http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1747408&r1=1747407&r2=1747408&view=diff
>
> ==============================================================================
> --- zookeeper/trunk/CHANGES.txt (original)
> +++ zookeeper/trunk/CHANGES.txt Wed Jun  8 15:43:15 2016
> @@ -308,6 +308,9 @@ BUGFIXES:
>    ZOOKEEPER-2405: getTGT() in Login.java mishandles confidential
>    information (Michael Han via phunt)
>
> +  ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne
> +  (Michael Han via rgs)
> +
>  IMPROVEMENTS:
>    ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir
> Lev-Ari via shralex)
>
>
> Modified:
> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
> URL:
> http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1747408&r1=1747407&r2=1747408&view=diff
>
> ==============================================================================
> ---
> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
> (original)
> +++
> zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
> Wed Jun  8 15:43:15 2016
> @@ -237,9 +237,7 @@ public class QuorumCnxManager {
>       * @param sid
>       */
>      public void testInitiateConnection(long sid) throws Exception {
> -        if (LOG.isDebugEnabled()) {
> -            LOG.debug("Opening channel to server " + sid);
> -        }
> +        LOG.debug("Opening channel to server " + sid);
>          Socket sock = new Socket();
>          setSockOpts(sock);
>          sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
> @@ -434,17 +432,14 @@ public class QuorumCnxManager {
>              LOG.debug("There is a connection already for server " + sid);
>              return true;
>          }
> -        try {
>
> -             if (LOG.isDebugEnabled()) {
> -                 LOG.debug("Opening channel to server " + sid);
> -             }
> -             Socket sock = new Socket();
> +        Socket sock = null;
> +        try {
> +             LOG.debug("Opening channel to server " + sid);
> +             sock = new Socket();
>               setSockOpts(sock);
>               sock.connect(electionAddr, cnxTO);
> -             if (LOG.isDebugEnabled()) {
> -                 LOG.debug("Connected to server " + sid);
> -             }
> +             LOG.debug("Connected to server " + sid);
>               initiateConnection(sock, sid);
>               return true;
>           } catch (UnresolvedAddressException e) {
> @@ -454,11 +449,13 @@ public class QuorumCnxManager {
>               // detail.
>               LOG.warn("Cannot open channel to " + sid
>                       + " at election address " + electionAddr, e);
> +             closeSocket(sock);
>               throw e;
>           } catch (IOException e) {
>               LOG.warn("Cannot open channel to " + sid
>                       + " at election address " + electionAddr,
>                       e);
> +             closeSocket(sock);
>               return false;
>           }
>
> @@ -574,6 +571,10 @@ public class QuorumCnxManager {
>       *            Reference to socket
>       */
>      private void closeSocket(Socket sock) {
> +        if (sock == null) {
> +            return;
> +        }
> +
>          try {
>              sock.close();
>          } catch (IOException ie) {
> @@ -614,7 +615,7 @@ public class QuorumCnxManager {
>          public void run() {
>              int numRetries = 0;
>              InetSocketAddress addr;
> -
> +            Socket client = null;
>              while((!shutdown) && (numRetries < 3)){
>                  try {
>                      ss = new ServerSocket();
> @@ -632,7 +633,7 @@ public class QuorumCnxManager {
>                      setName(addr.toString());
>                      ss.bind(addr);
>                      while (!shutdown) {
> -                        Socket client = ss.accept();
> +                        client = ss.accept();
>                          setSockOpts(client);
>                          LOG.info("Received connection request "
>                                  + client.getRemoteSocketAddress());
> @@ -654,6 +655,7 @@ public class QuorumCnxManager {
>                          LOG.error("Interrupted while sleeping. " +
>                              "Ignoring exception", ie);
>                      }
> +                    closeSocket(client);
>                  }
>              }
>              LOG.info("Leaving listener");
> @@ -739,9 +741,7 @@ public class QuorumCnxManager {
>          }
>
>          synchronized boolean finish() {
> -            if (LOG.isDebugEnabled()) {
> -                LOG.debug("Calling finish for " + sid);
> -            }
> +            LOG.debug("Calling finish for " + sid);
>
>              if(!running){
>                  /*
> @@ -752,16 +752,14 @@ public class QuorumCnxManager {
>
>              running = false;
>              closeSocket(sock);
> -            // channel = null;
>
>              this.interrupt();
>              if (recvWorker != null) {
>                  recvWorker.finish();
>              }
>
> -            if (LOG.isDebugEnabled()) {
> -                LOG.debug("Removing entry from senderWorkerMap sid=" +
> sid);
> -            }
> +            LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
> +
>              senderWorkerMap.remove(sid, this);
>              threadCnt.decrementAndGet();
>              return running;
> @@ -919,9 +917,7 @@ public class QuorumCnxManager {
>              } finally {
>                  LOG.warn("Interrupting SendWorker");
>                  sw.finish();
> -                if (sock != null) {
> -                    closeSocket(sock);
> -                }
> +                closeSocket(sock);
>              }
>          }
>      }
>
>
>