You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2016/10/13 23:22:47 UTC
[1/3] kudu git commit: [c++ client] use default RPC timeout for
batchers
Repository: kudu
Updated Branches:
refs/heads/master 7bcfcf64c -> 1746ae021
[c++ client] use default RPC timeout for batchers
If not overridden by KuduSession::SetTimeoutMillis(), use per-client
default RPC timeout (KuduClient::default_rpc_timeout()) for RPC
operations initiated by batchers. Previously, the hard-coded setting
of 60 seconds was used instead.
Change-Id: Ic44acd5e2723295e115677f83e3fcf4074a61e52
Reviewed-on: http://gerrit.cloudera.org:8080/4703
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5195ce57
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5195ce57
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5195ce57
Branch: refs/heads/master
Commit: 5195ce573850653e0e53094cdd35a1da93d33444
Parents: 7bcfcf6
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Oct 12 15:14:51 2016 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Oct 13 00:18:09 2016 +0000
----------------------------------------------------------------------
src/kudu/client/batcher.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/5195ce57/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index d7c4dca..9bb36dc 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -389,7 +389,7 @@ Batcher::Batcher(KuduClient* client,
had_errors_(false),
flush_callback_(nullptr),
next_op_sequence_number_(0),
- timeout_(MonoDelta::FromSeconds(60)),
+ timeout_(client->default_rpc_timeout()),
outstanding_lookups_(0),
buffer_bytes_used_(0) {
}
[2/3] kudu git commit: KUDU-1682. Lock contention on table locations
cache in Java client
Posted by al...@apache.org.
KUDU-1682. Lock contention on table locations cache in Java client
Change-Id: I0f6ba8f4fced6f043f7132fd11078044b004ea21
Reviewed-on: http://gerrit.cloudera.org:8080/4706
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb9526f9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb9526f9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb9526f9
Branch: refs/heads/master
Commit: fb9526f9ddd53066b617a6d7b6d1478eaa50fe96
Parents: 5195ce5
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 12 16:58:26 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 13 15:52:36 2016 +0000
----------------------------------------------------------------------
.../apache/kudu/client/TableLocationsCache.java | 29 ++++++++++++++++----
1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fb9526f9/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index 0c27bfa..ec7fb48 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@@ -45,22 +46,31 @@ class TableLocationsCache {
private static final Logger LOG = LoggerFactory.getLogger(TableLocationsCache.class);
private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
- private final Object monitor = new Object();
+ private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- @GuardedBy("monitor")
+ @GuardedBy("rwl")
private final NavigableMap<byte[], Entry> entries = new TreeMap<>(COMPARATOR);
public Entry get(byte[] partitionKey) {
+
if (partitionKey == null) {
// Master lookup.
- synchronized (monitor) {
+ rwl.readLock().lock();
+ try {
Preconditions.checkState(entries.size() <= 1);
return entries.get(AsyncKuduClient.EMPTY_ARRAY);
+ } finally {
+ rwl.readLock().unlock();
}
+
}
+
Map.Entry<byte[], Entry> entry;
- synchronized (monitor) {
+ rwl.readLock().lock();
+ try {
entry = entries.floorEntry(partitionKey);
+ } finally {
+ rwl.readLock().unlock();
}
if (entry == null ||
@@ -89,9 +99,13 @@ class TableLocationsCache {
// Master lookup.
Preconditions.checkArgument(tablets.size() == 1);
Entry entry = Entry.tablet(tablets.get(0), TimeUnit.DAYS.toMillis(1));
- synchronized (monitor) {
+
+ rwl.writeLock().lock();
+ try {
entries.clear();
entries.put(AsyncKuduClient.EMPTY_ARRAY, entry);
+ } finally {
+ rwl.writeLock().unlock();
}
return;
}
@@ -164,7 +178,8 @@ class TableLocationsCache {
LOG.debug("Discovered table locations:\t{}", newEntries);
- synchronized (monitor) {
+ rwl.writeLock().lock();
+ try {
// Remove all existing overlapping entries, and add the new entries.
Map.Entry<byte[], Entry> floorEntry = entries.floorEntry(discoveredlowerBound);
if (floorEntry != null &&
@@ -186,6 +201,8 @@ class TableLocationsCache {
for (Entry entry : newEntries) {
entries.put(entry.getLowerBoundPartitionKey(), entry);
}
+ } finally {
+ rwl.writeLock().unlock();
}
}
[3/3] kudu git commit: [java client] Cleanup AsyncKuduClient's unused
caches
Posted by al...@apache.org.
[java client] Cleanup AsyncKuduClient's unused caches
Originally, asynchbase came with a few caches such as server-connection->region and
region->server-connection. Via dozens of patches, we've reduced their usefulness to 0.
This patch simple finishes the job and removes them.
FWIW client2tablets was always a source of pain, and the caching logic is now a lot
easier to manage, and potentially refactor!
Change-Id: I62802c34c618c83a4ff69d79825387cbe4ab51a8
Reviewed-on: http://gerrit.cloudera.org:8080/4705
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1746ae02
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1746ae02
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1746ae02
Branch: refs/heads/master
Commit: 1746ae0210ef1340346edae7e154ab5254fccda8
Parents: fb9526f
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Oct 12 16:07:57 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 13 23:11:34 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 98 ++++----------------
.../kudu/client/TestAsyncKuduSession.java | 4 +-
2 files changed, 22 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 84e60de..cbb128e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -142,43 +142,18 @@ public class AsyncKuduClient implements AutoCloseable {
private final ClientSocketChannelFactory channelFactory;
/**
- * This map and the next 2 maps contain data cached from calls to the master's
- * GetTableLocations RPC. There is no consistency guarantee across the maps.
- * They are not updated all at the same time atomically.
- *
- * {@code tableLocations} is always the first to be updated because it's the
- * map from which all the lookups are done in the fast-path of the requests
- * that need to locate a tablet. {@code tablet2client} is updated second,
- * because it comes second in the fast-path of every requests that need to
- * locate a tablet. {@code client2tablets} is only used to handle TabletServer
- * disconnections gracefully.
- *
- * This map is keyed by table ID.
+ * This map contains data cached from calls to the master's
+ * GetTableLocations RPC. This map is keyed by table ID.
*/
private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
new ConcurrentHashMap<>();
/**
- * Maps a tablet ID to the RemoteTablet that knows where all the replicas are served.
- */
- private final ConcurrentHashMap<Slice, RemoteTablet> tablet2client = new ConcurrentHashMap<>();
-
- /**
- * Maps a client connected to a TabletServer to the list of tablets we know
- * it's serving so far.
- */
- @VisibleForTesting
- final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets =
- new ConcurrentHashMap<>();
-
- /**
* Cache that maps a TabletServer address ("ip:port") to the clients
* connected to it.
* <p>
* Access to this map must be synchronized by locking its monitor.
- * Lock ordering: when locking both this map and a TabletClient, the
- * TabletClient must always be locked first to avoid deadlocks. Logging
- * the contents of this map (or calling toString) requires copying it first.
+ * Logging the contents of this map (or calling toString) requires copying it first.
* <p>
* This isn't a {@link ConcurrentHashMap} because we don't use it frequently
* (just when connecting to / disconnecting from TabletClients) and when we
@@ -197,11 +172,12 @@ public class AsyncKuduClient implements AutoCloseable {
* that are going to cause unnecessary errors.
* @see TabletClientPipeline#handleDisconnect
*/
- private final HashMap<String, TabletClient> ip2client =
- new HashMap<String, TabletClient>();
+ @VisibleForTesting
+ @GuardedBy("ip2client")
+ final HashMap<String, TabletClient> ip2client = new HashMap<>();
@GuardedBy("sessions")
- private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>();
+ private final Set<AsyncKuduSession> sessions = new HashSet<>();
// Since the masters also go through TabletClient, we need to treat them as if they were a normal
// table. We'll use the following fake table name to identify places where we need special
@@ -219,9 +195,12 @@ public class AsyncKuduClient implements AutoCloseable {
*/
private long lastPropagatedTimestamp = NO_TIMESTAMP;
- // A table is considered not served when we get an empty list of locations but know
- // that a tablet exists. This is currently only used for new tables. The objects stored are
- // table IDs.
+ /**
+ * A table is considered not served when we get a TABLET_NOT_RUNNING error from the master
+ * after calling GetTableLocations (it means that some tablets aren't ready to serve yet).
+ * We cache this information so that concurrent RPCs sent just after creating a table don't
+ * all try to hit the master for no good reason.
+ */
private final Set<String> tablesNotServed = Collections.newSetFromMap(new
ConcurrentHashMap<String, Boolean>());
@@ -989,8 +968,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Clears {@link #tableLocations} and {@link #tablet2client} of the table's
- * entries.
+ * Clears {@link #tableLocations} of the table's entries.
*
* This method makes the maps momentarily inconsistent, and should only be
* used when the {@code AsyncKuduClient} is in a steady state.
@@ -1000,12 +978,6 @@ public class AsyncKuduClient implements AutoCloseable {
@VisibleForTesting
void emptyTabletsCacheForTable(String tableId) {
tableLocations.remove(tableId);
- Set<Map.Entry<Slice, RemoteTablet>> tablets = tablet2client.entrySet();
- for (Map.Entry<Slice, RemoteTablet> entry : tablets) {
- if (entry.getValue().getTableId().equals(tableId)) {
- tablets.remove(entry);
- }
- }
}
TabletClient clientFor(RemoteTablet tablet) {
@@ -1384,26 +1356,10 @@ public class AsyncKuduClient implements AutoCloseable {
// already discovered the tablet, its locations are refreshed.
List<RemoteTablet> tablets = new ArrayList<>(locations.size());
for (Master.TabletLocationsPB tabletPb : locations) {
- // Early creating the tablet so that it parses out the pb.
RemoteTablet rt = createTabletFromPb(tableId, tabletPb);
Slice tabletId = rt.tabletId;
- // If we already know about this tablet, refresh the locations.
- RemoteTablet currentTablet = tablet2client.get(tabletId);
- if (currentTablet != null) {
- currentTablet.refreshTabletClients(tabletPb);
- tablets.add(currentTablet);
- continue;
- }
-
- // Putting it here first doesn't make it visible because tabletsCache is always looked up
- // first.
- RemoteTablet oldRt = tablet2client.putIfAbsent(tabletId, rt);
- if (oldRt != null) {
- // someone beat us to it
- continue;
- }
- LOG.info("Discovered tablet {} for table '{}' with partition {}",
+ LOG.info("Learned about tablet {} for table '{}' with partition {}",
tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
rt.refreshTabletClients(tabletPb);
tablets.add(rt);
@@ -1546,9 +1502,6 @@ public class AsyncKuduClient implements AutoCloseable {
chan = channelFactory.newChannel(pipeline);
TabletClient oldClient = ip2client.put(hostport, client);
assert oldClient == null;
-
- // The client2tables map is assumed to contain `client` after it is published in ip2client.
- this.client2tablets.put(client, new ArrayList<RemoteTablet>());
}
final SocketChannelConfig config = chan.getConfig();
config.setConnectTimeoutMillis(5000);
@@ -1792,7 +1745,6 @@ public class AsyncKuduClient implements AutoCloseable {
TabletClient old;
synchronized (ip2client) {
old = ip2client.remove(hostport);
- client2tablets.remove(client);
}
LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
@@ -2050,8 +2002,8 @@ public class AsyncKuduClient implements AutoCloseable {
replicas.set(replicasBuilder.build());
}
- // Must be called with tabletServers synchronized
- void addTabletClient(String uuid, String host, int port, boolean isLeader)
+ @GuardedBy("tabletServers")
+ private void addTabletClient(String uuid, String host, int port, boolean isLeader)
throws UnknownHostException {
String ip = getIP(host);
if (ip == null) {
@@ -2059,19 +2011,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
TabletClient client = newClient(uuid, ip, port);
- ArrayList<RemoteTablet> tablets = client2tablets.get(client);
-
- if (tablets == null) {
- // We lost a race, someone removed the client we received.
- return;
- }
-
- synchronized (tablets) {
- tabletServers.add(client);
- if (isLeader) {
- leaderIndex = tabletServers.size() - 1;
- }
- tablets.add(this);
+ tabletServers.add(client);
+ if (isLeader) {
+ leaderIndex = tabletServers.size() - 1;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 15ce59b..2de16fd 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
- int numClientsBefore = client.client2tablets.size();
+ int numClientsBefore = client.ip2client.size();
// Restart all the tablet servers.
killTabletServers();
@@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
// We should not have leaked an entry in the client2tablets map.
- int numClientsAfter = client.client2tablets.size();
+ int numClientsAfter = client.ip2client.size();
assertEquals(numClientsBefore, numClientsAfter);
} finally {
restartTabletServers();