You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/09/09 23:51:03 UTC

kudu git commit: java: fix leak of TabletClient objects in client2tablets map

Repository: kudu
Updated Branches:
  refs/heads/master c386f73a9 -> d5082d8ec


java: fix leak of TabletClient objects in client2tablets map

After running YCSB for a week, most of the clients had hit OOMEs. Some
heap dump analysis showed that the client2tablets map had hundreds of
thousands of leaked clients.

It seems that we were neglecting to remove the client from the
client2tablets map upon a disconnect. This fixes the issue and adds a
regression test which reproduced the bug.

This patch was worked on by Todd Lipcon and David Alves.

Change-Id: I302650f2a6526e7c51537264137a4f00cbbda073
Reviewed-on: http://gerrit.cloudera.org:8080/4119
Tested-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d5082d8e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d5082d8e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d5082d8e

Branch: refs/heads/master
Commit: d5082d8ec1218e3f3bd2143d117ddd64772a6162
Parents: c386f73
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 24 16:41:28 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Sep 9 23:47:56 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 28 ++++++++++++-
 .../org/apache/kudu/client/BaseKuduTest.java    |  2 +-
 .../org/apache/kudu/client/MiniKuduCluster.java |  4 +-
 .../kudu/client/TestAsyncKuduSession.java       | 43 ++++++++++++++++++++
 4 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d5082d8e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index c42bd2f..a4da389 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -167,7 +167,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * Maps a client connected to a TabletServer to the list of tablets we know
    * it's serving so far.
    */
-  private final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets =
+  @VisibleForTesting
+  final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets =
       new ConcurrentHashMap<>();
 
   /**
@@ -1528,7 +1529,8 @@ public class AsyncKuduClient implements AutoCloseable {
       final TabletClientPipeline pipeline = new TabletClientPipeline();
       client = pipeline.init(uuid, host, port);
       chan = channelFactory.newChannel(pipeline);
-      ip2client.put(hostport, client);  // This is guaranteed to return null.
+      TabletClient oldClient = ip2client.put(hostport, client);
+      assert oldClient == null;
 
       // The client2tables map is assumed to contain `client` after it is published in ip2client.
       this.client2tablets.put(client, new ArrayList<RemoteTablet>());
@@ -1785,7 +1787,29 @@ public class AsyncKuduClient implements AutoCloseable {
       LOG.trace("When expiring " + client + " from the client cache (host:port="
           + hostport + "), it was found that there was no entry"
           + " corresponding to " + remote + ".  This shouldn't happen.");
+    } else {
+      removeClient(old);
+    }
+  }
+
+  /**
+   * Remove all references to a client from client2tablets and from the remoteTablets
+   * that reference it.
+   * @param client tablet client to remove
+   */
+  void removeClient(final TabletClient client) {
+    ArrayList<RemoteTablet> tablets = client2tablets.get(client);
+    if (tablets != null) {
+      // Make a copy so we don't need to synchronize on it while iterating.
+      RemoteTablet[] tablets_copy;
+      synchronized (tablets) {
+        tablets_copy = tablets.toArray(new RemoteTablet[tablets.size()]);
+      }
+      for (final RemoteTablet remoteTablet : tablets_copy) {
+        remoteTablet.removeTabletClient(client);
+      }
     }
+    client2tablets.remove(client);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/d5082d8e/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 967c735..41015a4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.fail;
 
 public class BaseKuduTest {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BaseKuduTest.class);
+  protected static final Logger LOG = LoggerFactory.getLogger(BaseKuduTest.class);
 
   private static final String NUM_MASTERS_PROP = "NUM_MASTERS";
   private static final int NUM_TABLET_SERVERS = 3;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d5082d8e/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 34e44bf..386ce9e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -311,11 +311,11 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
-   * Restarts the dead tablet servers on the port.
-   * @throws Exception
+   * Restarts any tablet servers which were previously killed.
    */
   public void restartDeadTabletServers() throws Exception {
     for (int port : tserverPorts) {
+      if (tserverProcesses.containsKey(port)) continue;
       restartDeadTabletServerOnPort(port);
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d5082d8e/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index cf14b7c..15ce59b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -172,6 +172,49 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     }
   }
 
+  /**
+   * Regression test for a bug in which, when a tablet client is disconnected
+   * and we reconnect, we were previously leaking the old TabletClient
+   * object in the client2tablets map.
+   */
+  @Test(timeout = 100000)
+  public void testRestartBetweenWrites() throws Exception {
+    // Create a non-replicated table for this test, so that
+    // we're sure when we reconnect to the leader after restarting
+    // the tablet servers, it's definitely the same leader we wrote
+    // to before.
+    KuduTable nonReplicatedTable = createTable(
+        "non-replicated",
+        basicSchema,
+        getBasicCreateTableOptions().setNumReplicas(1));
+
+    try {
+      // Write before doing any restarts to establish a connection.
+      AsyncKuduSession session = client.newSession();
+      session.setTimeoutMillis(30000);
+      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+      session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
+
+      int numClientsBefore = client.client2tablets.size();
+
+      // Restart all the tablet servers.
+      killTabletServers();
+      restartTabletServers();
+
+      // Perform another write, which will require reconnecting to the same
+      // tablet server that we wrote to above.
+      session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
+
+      // We should not have leaked an entry in the client2tablets map.
+      int numClientsAfter = client.client2tablets.size();
+      assertEquals(numClientsBefore, numClientsAfter);
+    } finally {
+      restartTabletServers();
+
+      client.deleteTable("non-replicated").join();
+    }
+  }
+
   @Test(timeout = 100000)
   public void test() throws Exception {