You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2016/10/13 23:22:47 UTC

[1/3] kudu git commit: [c++ client] use default RPC timeout for batchers

Repository: kudu
Updated Branches:
  refs/heads/master 7bcfcf64c -> 1746ae021


[c++ client] use default RPC timeout for batchers

If not overridden by KuduSession::SetTimeoutMillis(), use per-client
default RPC timeout (KuduClient::default_rpc_timeout()) for RPC
operations initiated by batchers.  Previously, the hard-coded setting
of 60 seconds was used instead.

Change-Id: Ic44acd5e2723295e115677f83e3fcf4074a61e52
Reviewed-on: http://gerrit.cloudera.org:8080/4703
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5195ce57
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5195ce57
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5195ce57

Branch: refs/heads/master
Commit: 5195ce573850653e0e53094cdd35a1da93d33444
Parents: 7bcfcf6
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Oct 12 15:14:51 2016 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Oct 13 00:18:09 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5195ce57/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index d7c4dca..9bb36dc 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -389,7 +389,7 @@ Batcher::Batcher(KuduClient* client,
     had_errors_(false),
     flush_callback_(nullptr),
     next_op_sequence_number_(0),
-    timeout_(MonoDelta::FromSeconds(60)),
+    timeout_(client->default_rpc_timeout()),
     outstanding_lookups_(0),
     buffer_bytes_used_(0) {
 }


[2/3] kudu git commit: KUDU-1682. Lock contention on table locations cache in Java client

Posted by al...@apache.org.
KUDU-1682. Lock contention on table locations cache in Java client

Change-Id: I0f6ba8f4fced6f043f7132fd11078044b004ea21
Reviewed-on: http://gerrit.cloudera.org:8080/4706
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb9526f9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb9526f9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb9526f9

Branch: refs/heads/master
Commit: fb9526f9ddd53066b617a6d7b6d1478eaa50fe96
Parents: 5195ce5
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 12 16:58:26 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 13 15:52:36 2016 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/TableLocationsCache.java | 29 ++++++++++++++++----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fb9526f9/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index 0c27bfa..ec7fb48 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -45,22 +46,31 @@ class TableLocationsCache {
   private static final Logger LOG = LoggerFactory.getLogger(TableLocationsCache.class);
   private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
 
-  private final Object monitor = new Object();
+  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
 
-  @GuardedBy("monitor")
+  @GuardedBy("rwl")
   private final NavigableMap<byte[], Entry> entries = new TreeMap<>(COMPARATOR);
 
   public Entry get(byte[] partitionKey) {
+
     if (partitionKey == null) {
       // Master lookup.
-      synchronized (monitor) {
+      rwl.readLock().lock();
+      try {
         Preconditions.checkState(entries.size() <= 1);
         return entries.get(AsyncKuduClient.EMPTY_ARRAY);
+      } finally {
+        rwl.readLock().unlock();
       }
+
     }
+
     Map.Entry<byte[], Entry> entry;
-    synchronized (monitor) {
+    rwl.readLock().lock();
+    try {
       entry = entries.floorEntry(partitionKey);
+    } finally {
+      rwl.readLock().unlock();
     }
 
     if (entry == null ||
@@ -89,9 +99,13 @@ class TableLocationsCache {
       // Master lookup.
       Preconditions.checkArgument(tablets.size() == 1);
       Entry entry = Entry.tablet(tablets.get(0), TimeUnit.DAYS.toMillis(1));
-      synchronized (monitor) {
+
+      rwl.writeLock().lock();
+      try {
         entries.clear();
         entries.put(AsyncKuduClient.EMPTY_ARRAY, entry);
+      } finally {
+        rwl.writeLock().unlock();
       }
       return;
     }
@@ -164,7 +178,8 @@ class TableLocationsCache {
 
     LOG.debug("Discovered table locations:\t{}", newEntries);
 
-    synchronized (monitor) {
+    rwl.writeLock().lock();
+    try {
       // Remove all existing overlapping entries, and add the new entries.
       Map.Entry<byte[], Entry> floorEntry = entries.floorEntry(discoveredlowerBound);
       if (floorEntry != null &&
@@ -186,6 +201,8 @@ class TableLocationsCache {
       for (Entry entry : newEntries) {
         entries.put(entry.getLowerBoundPartitionKey(), entry);
       }
+    } finally {
+      rwl.writeLock().unlock();
     }
   }
 


[3/3] kudu git commit: [java client] Cleanup AsyncKuduClient's unused caches

Posted by al...@apache.org.
[java client] Cleanup AsyncKuduClient's unused caches

Originally, asynchbase came with a few caches such as server-connection->region and
region->server-connection. Via dozens of patches, we've reduced their usefulness to 0.
This patch simple finishes the job and removes them.

FWIW client2tablets was always a source of pain, and the caching logic is now a lot
easier to manage, and potentially refactor!

Change-Id: I62802c34c618c83a4ff69d79825387cbe4ab51a8
Reviewed-on: http://gerrit.cloudera.org:8080/4705
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1746ae02
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1746ae02
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1746ae02

Branch: refs/heads/master
Commit: 1746ae0210ef1340346edae7e154ab5254fccda8
Parents: fb9526f
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 12 16:07:57 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 13 23:11:34 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 98 ++++----------------
 .../kudu/client/TestAsyncKuduSession.java       |  4 +-
 2 files changed, 22 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 84e60de..cbb128e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -142,43 +142,18 @@ public class AsyncKuduClient implements AutoCloseable {
   private final ClientSocketChannelFactory channelFactory;
 
   /**
-   * This map and the next 2 maps contain data cached from calls to the master's
-   * GetTableLocations RPC. There is no consistency guarantee across the maps.
-   * They are not updated all at the same time atomically.
-   *
-   * {@code tableLocations} is always the first to be updated because it's the
-   * map from which all the lookups are done in the fast-path of the requests
-   * that need to locate a tablet. {@code tablet2client} is updated second,
-   * because it comes second in the fast-path of every requests that need to
-   * locate a tablet. {@code client2tablets} is only used to handle TabletServer
-   * disconnections gracefully.
-   *
-   * This map is keyed by table ID.
+   * This map contains data cached from calls to the master's
+   * GetTableLocations RPC. This map is keyed by table ID.
    */
   private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
       new ConcurrentHashMap<>();
 
   /**
-   * Maps a tablet ID to the RemoteTablet that knows where all the replicas are served.
-   */
-  private final ConcurrentHashMap<Slice, RemoteTablet> tablet2client = new ConcurrentHashMap<>();
-
-  /**
-   * Maps a client connected to a TabletServer to the list of tablets we know
-   * it's serving so far.
-   */
-  @VisibleForTesting
-  final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets =
-      new ConcurrentHashMap<>();
-
-  /**
    * Cache that maps a TabletServer address ("ip:port") to the clients
    * connected to it.
    * <p>
    * Access to this map must be synchronized by locking its monitor.
-   * Lock ordering: when locking both this map and a TabletClient, the
-   * TabletClient must always be locked first to avoid deadlocks.  Logging
-   * the contents of this map (or calling toString) requires copying it first.
+   * Logging the contents of this map (or calling toString) requires copying it first.
    * <p>
    * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
    * (just when connecting to / disconnecting from TabletClients) and when we
@@ -197,11 +172,12 @@ public class AsyncKuduClient implements AutoCloseable {
    * that are going to cause unnecessary errors.
    * @see TabletClientPipeline#handleDisconnect
    */
-  private final HashMap<String, TabletClient> ip2client =
-      new HashMap<String, TabletClient>();
+  @VisibleForTesting
+  @GuardedBy("ip2client")
+  final HashMap<String, TabletClient> ip2client = new HashMap<>();
 
   @GuardedBy("sessions")
-  private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>();
+  private final Set<AsyncKuduSession> sessions = new HashSet<>();
 
   // Since the masters also go through TabletClient, we need to treat them as if they were a normal
   // table. We'll use the following fake table name to identify places where we need special
@@ -219,9 +195,12 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   private long lastPropagatedTimestamp = NO_TIMESTAMP;
 
-  // A table is considered not served when we get an empty list of locations but know
-  // that a tablet exists. This is currently only used for new tables. The objects stored are
-  // table IDs.
+  /**
+   * A table is considered not served when we get a TABLET_NOT_RUNNING error from the master
+   * after calling GetTableLocations (it means that some tablets aren't ready to serve yet).
+   * We cache this information so that concurrent RPCs sent just after creating a table don't
+   * all try to hit the master for no good reason.
+   */
   private final Set<String> tablesNotServed = Collections.newSetFromMap(new
       ConcurrentHashMap<String, Boolean>());
 
@@ -989,8 +968,7 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Clears {@link #tableLocations} and {@link #tablet2client} of the table's
-   * entries.
+   * Clears {@link #tableLocations} of the table's entries.
    *
    * This method makes the maps momentarily inconsistent, and should only be
    * used when the {@code AsyncKuduClient} is in a steady state.
@@ -1000,12 +978,6 @@ public class AsyncKuduClient implements AutoCloseable {
   @VisibleForTesting
   void emptyTabletsCacheForTable(String tableId) {
     tableLocations.remove(tableId);
-    Set<Map.Entry<Slice, RemoteTablet>> tablets = tablet2client.entrySet();
-    for (Map.Entry<Slice, RemoteTablet> entry : tablets) {
-      if (entry.getValue().getTableId().equals(tableId)) {
-        tablets.remove(entry);
-      }
-    }
   }
 
   TabletClient clientFor(RemoteTablet tablet) {
@@ -1384,26 +1356,10 @@ public class AsyncKuduClient implements AutoCloseable {
     // already discovered the tablet, its locations are refreshed.
     List<RemoteTablet> tablets = new ArrayList<>(locations.size());
     for (Master.TabletLocationsPB tabletPb : locations) {
-      // Early creating the tablet so that it parses out the pb.
       RemoteTablet rt = createTabletFromPb(tableId, tabletPb);
       Slice tabletId = rt.tabletId;
 
-      // If we already know about this tablet, refresh the locations.
-      RemoteTablet currentTablet = tablet2client.get(tabletId);
-      if (currentTablet != null) {
-        currentTablet.refreshTabletClients(tabletPb);
-        tablets.add(currentTablet);
-        continue;
-      }
-
-      // Putting it here first doesn't make it visible because tabletsCache is always looked up
-      // first.
-      RemoteTablet oldRt = tablet2client.putIfAbsent(tabletId, rt);
-      if (oldRt != null) {
-        // someone beat us to it
-        continue;
-      }
-      LOG.info("Discovered tablet {} for table '{}' with partition {}",
+      LOG.info("Learned about tablet {} for table '{}' with partition {}",
                tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
       rt.refreshTabletClients(tabletPb);
       tablets.add(rt);
@@ -1546,9 +1502,6 @@ public class AsyncKuduClient implements AutoCloseable {
       chan = channelFactory.newChannel(pipeline);
       TabletClient oldClient = ip2client.put(hostport, client);
       assert oldClient == null;
-
-      // The client2tables map is assumed to contain `client` after it is published in ip2client.
-      this.client2tablets.put(client, new ArrayList<RemoteTablet>());
     }
     final SocketChannelConfig config = chan.getConfig();
     config.setConnectTimeoutMillis(5000);
@@ -1792,7 +1745,6 @@ public class AsyncKuduClient implements AutoCloseable {
     TabletClient old;
     synchronized (ip2client) {
       old = ip2client.remove(hostport);
-      client2tablets.remove(client);
     }
 
     LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
@@ -2050,8 +2002,8 @@ public class AsyncKuduClient implements AutoCloseable {
       replicas.set(replicasBuilder.build());
     }
 
-    // Must be called with tabletServers synchronized
-    void addTabletClient(String uuid, String host, int port, boolean isLeader)
+    @GuardedBy("tabletServers")
+    private void addTabletClient(String uuid, String host, int port, boolean isLeader)
         throws UnknownHostException {
       String ip = getIP(host);
       if (ip == null) {
@@ -2059,19 +2011,9 @@ public class AsyncKuduClient implements AutoCloseable {
       }
       TabletClient client = newClient(uuid, ip, port);
 
-      ArrayList<RemoteTablet> tablets = client2tablets.get(client);
-
-      if (tablets == null) {
-        // We lost a race, someone removed the client we received.
-        return;
-      }
-
-      synchronized (tablets) {
-        tabletServers.add(client);
-        if (isLeader) {
-          leaderIndex = tabletServers.size() - 1;
-        }
-        tablets.add(this);
+      tabletServers.add(client);
+      if (isLeader) {
+        leaderIndex = tabletServers.size() - 1;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 15ce59b..2de16fd 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
 
-      int numClientsBefore = client.client2tablets.size();
+      int numClientsBefore = client.ip2client.size();
 
       // Restart all the tablet servers.
       killTabletServers();
@@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
       session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
 
       // We should not have leaked an entry in the client2tablets map.
-      int numClientsAfter = client.client2tablets.size();
+      int numClientsAfter = client.ip2client.size();
       assertEquals(numClientsBefore, numClientsAfter);
     } finally {
       restartTabletServers();