You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/11/21 23:05:27 UTC

[1/2] kudu git commit: KUDU-2165 workaround: avoid TSAN warnings on CacheMetrics

Repository: kudu
Updated Branches:
  refs/heads/master 3abca98c5 -> 9269e0abc


KUDU-2165 workaround: avoid TSAN warnings on CacheMetrics

InternalMiniCluster tests were spitting out TSAN warnings where one thread
(associated with one mini-tserver) accesses the CacheMetrics while another
thread is busy restarting a tserver. The restarting tserver ends up
replacing the CacheMetrics instance, which caused TSAN to warn due to the
unprotected concurrent access.

This patch is a relatively simple workaround that just prevents the Cache
from switching CacheMetrics instances once it is first constructed. The
underlying metrics are ref-counted, so we shouldn't have any leak or illegal
access. The only downside is that, after a restart, new tservers won't be able
to "attach" to the cache instance. But, we already had somewhat randomly
associated the cache metrics with one of the tablet servers, so it seems
unlikely that any assertions are depending on the functionality.

Along the way, I removed the Cache::NewId() method which was unused.

Change-Id: Ifc5c6e9306df78c364c8b89651ddcc56b90a924f
Reviewed-on: http://gerrit.cloudera.org:8080/8617
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f343a673
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f343a673
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f343a673

Branch: refs/heads/master
Commit: f343a67306790bdcbd6688a472199d2bb25afa2c
Parents: 3abca98
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Nov 21 11:57:25 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Nov 21 22:41:13 2017 +0000

----------------------------------------------------------------------
 src/kudu/util/cache-test.cc |  7 -------
 src/kudu/util/cache.cc      | 28 +++++++++++++++++++---------
 src/kudu/util/cache.h       |  6 ------
 src/kudu/util/nvm_cache.cc  | 11 ++---------
 4 files changed, 21 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f343a673/src/kudu/util/cache-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/cache-test.cc b/src/kudu/util/cache-test.cc
index 4a16be0..6baa215 100644
--- a/src/kudu/util/cache-test.cc
+++ b/src/kudu/util/cache-test.cc
@@ -3,7 +3,6 @@
 // found in the LICENSE file.
 
 #include <cassert>
-#include <cstdint>
 #include <cstring>
 #include <memory>
 #include <string>
@@ -238,10 +237,4 @@ TEST_P(CacheTest, HeavyEntries) {
   ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
 }
 
-TEST_P(CacheTest, NewId) {
-  uint64_t a = cache_->NewId();
-  uint64_t b = cache_->NewId();
-  ASSERT_NE(a, b);
-}
-
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/f343a673/src/kudu/util/cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/cache.cc b/src/kudu/util/cache.cc
index ff52a82..a2fc80b 100644
--- a/src/kudu/util/cache.cc
+++ b/src/kudu/util/cache.cc
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include <cstdint>
 #include <cstring>
 #include <memory>
 #include <mutex>
@@ -18,7 +19,9 @@
 #include "kudu/gutil/bits.h"
 #include "kudu/gutil/hash/city.h"
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
@@ -29,6 +32,8 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util_prod.h"
 
 #if !defined(__APPLE__)
 #include "kudu/util/nvm_cache.h"
@@ -395,12 +400,14 @@ class ShardedLRUCache : public Cache {
   shared_ptr<MemTracker> mem_tracker_;
   gscoped_ptr<CacheMetrics> metrics_;
   vector<LRUCache*> shards_;
-  MutexType id_mutex_;
-  uint64_t last_id_;
 
   // Number of bits of hash used to determine the shard.
   const int shard_bits_;
 
+  // Protects 'metrics_'. Used only when metrics are set, to ensure
+  // that they are set only once in test environments.
+  MutexType metrics_lock_;
+
   static inline uint32_t HashSlice(const Slice& s) {
     return util_hash::CityHash64(
       reinterpret_cast<const char *>(s.data()), s.size());
@@ -414,8 +421,7 @@ class ShardedLRUCache : public Cache {
 
  public:
   explicit ShardedLRUCache(size_t capacity, const string& id)
-      : last_id_(0),
-        shard_bits_(DetermineShardBits()) {
+      : shard_bits_(DetermineShardBits()) {
     // A cache is often a singleton, so:
     // 1. We reuse its MemTracker if one already exists, and
     // 2. It is directly parented to the root MemTracker.
@@ -455,12 +461,16 @@ class ShardedLRUCache : public Cache {
   virtual Slice Value(Handle* handle) OVERRIDE {
     return reinterpret_cast<LRUHandle*>(handle)->value();
   }
-  virtual uint64_t NewId() OVERRIDE {
-    std::lock_guard<MutexType> l(id_mutex_);
-    return ++(last_id_);
-  }
-
   virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+    // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
+    // causes TSAN errors. So, we'll ensure that metrics only get attached once, from
+    // whichever server starts first. This has the downside that, in test builds, we won't
+    // get accurate cache metrics, but that's probably better than spurious failures.
+    std::lock_guard<simple_spinlock> l(metrics_lock_);
+    if (metrics_) {
+      CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
+      return;
+    }
     metrics_.reset(new CacheMetrics(entity));
     for (LRUCache* cache : shards_) {
       cache->SetMetrics(metrics_.get());

http://git-wip-us.apache.org/repos/asf/kudu/blob/f343a673/src/kudu/util/cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/cache.h b/src/kudu/util/cache.h
index 036b7d1..462307c 100644
--- a/src/kudu/util/cache.h
+++ b/src/kudu/util/cache.h
@@ -129,12 +129,6 @@ class Cache {
   // to it have been released.
   virtual void Erase(const Slice& key) = 0;
 
-  // Return a new numeric id.  May be used by multiple clients who are
-  // sharing the same cache to partition the key space.  Typically the
-  // client will allocate a new id at startup and prepend the id to
-  // its cache keys.
-  virtual uint64_t NewId() = 0;
-
   // Pass a metric entity in order to start recoding metrics.
   virtual void SetMetrics(const scoped_refptr<MetricEntity>& metric_entity) = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/f343a673/src/kudu/util/nvm_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/nvm_cache.cc b/src/kudu/util/nvm_cache.cc
index fef188c..ae8a871 100644
--- a/src/kudu/util/nvm_cache.cc
+++ b/src/kudu/util/nvm_cache.cc
@@ -463,8 +463,6 @@ class ShardedLRUCache : public Cache {
  private:
   gscoped_ptr<CacheMetrics> metrics_;
   vector<NvmLRUCache*> shards_;
-  MutexType id_mutex_;
-  uint64_t last_id_;
   VMEM* vmp_;
 
   static inline uint32_t HashSlice(const Slice& s) {
@@ -477,9 +475,8 @@ class ShardedLRUCache : public Cache {
   }
 
  public:
-  explicit ShardedLRUCache(size_t capacity, const string& id, VMEM* vmp)
-        : last_id_(0),
-          vmp_(vmp) {
+  explicit ShardedLRUCache(size_t capacity, const string& /*id*/, VMEM* vmp)
+        : vmp_(vmp) {
 
     const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
     for (int s = 0; s < kNumShards; s++) {
@@ -521,10 +518,6 @@ class ShardedLRUCache : public Cache {
     return reinterpret_cast<LRUHandle*>(handle)->val_ptr();
   }
 
-  virtual uint64_t NewId() OVERRIDE {
-    std::lock_guard<MutexType> l(id_mutex_);
-    return ++(last_id_);
-  }
   virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
     metrics_.reset(new CacheMetrics(entity));
     for (NvmLRUCache* cache : shards_) {


[2/2] kudu git commit: [java] Add ReplicaSelection in KuduScanToken

Posted by da...@apache.org.
[java] Add ReplicaSelection in KuduScanToken

This patch adds ReplicaSelection in KuduScanToken for java client,
so that deserializing a ScanToken results in propagating the replica
selection policy of the serializer into the deserializer.

Change-Id: I860fcc73e486642ab5177cfd0dc0bdc98fdc6914
Reviewed-on: http://gerrit.cloudera.org:8080/8559
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9269e0ab
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9269e0ab
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9269e0ab

Branch: refs/heads/master
Commit: 9269e0abc39c5143e5c7e5fff5facc2d61192fd2
Parents: f343a67
Author: hahao <ha...@cloudera.com>
Authored: Wed Nov 15 14:26:09 2017 -0800
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Nov 21 23:05:15 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/KuduScanToken.java   | 20 ++++++++++++++++++++
 .../kudu/client/TestScannerMultiTablet.java     | 16 ++++++++++++++++
 src/kudu/client/client.proto                    |  4 ++++
 src/kudu/common/common.proto                    | 10 ++++++++++
 4 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9269e0ab/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 2e3d1d6..b85c37f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -223,6 +223,20 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       }
     }
 
+    if (message.hasReplicaSelection()) {
+      switch (message.getReplicaSelection()) {
+        case LEADER_ONLY: {
+          builder.replicaSelection(ReplicaSelection.LEADER_ONLY);
+          break;
+        }
+        case CLOSEST_REPLICA: {
+          builder.replicaSelection(ReplicaSelection.CLOSEST_REPLICA);
+          break;
+        }
+        default: throw new IllegalArgumentException("unknown replica selection policy");
+      }
+    }
+
     if (message.hasPropagatedTimestamp() &&
         message.getPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
       client.updateLastPropagatedTimestamp(message.getPropagatedTimestamp());
@@ -330,6 +344,12 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       proto.setLimit(limit);
       proto.setReadMode(readMode.pbVersion());
 
+      if (replicaSelection == ReplicaSelection.LEADER_ONLY) {
+        proto.setReplicaSelection(Common.ReplicaSelection.LEADER_ONLY);
+      } else if (replicaSelection == ReplicaSelection.CLOSEST_REPLICA) {
+        proto.setReplicaSelection(Common.ReplicaSelection.CLOSEST_REPLICA);
+      }
+
       // If the last propagated timestamp is set send it with the scan.
       if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
         proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());

http://git-wip-us.apache.org/repos/asf/kudu/blob/9269e0ab/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index e81d4bb..1bef824 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import org.apache.kudu.client.Client.ScanTokenPB;
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 
 public class TestScannerMultiTablet extends BaseKuduTest {
@@ -196,6 +197,21 @@ public class TestScannerMultiTablet extends BaseKuduTest {
   }
 
   @Test(timeout = 100000)
+  public void testScanTokenReplicaSelections() throws Exception {
+    ScanTokenPB.Builder pbBuilder = ScanTokenPB.newBuilder();
+    pbBuilder.setTableName(table.getName());
+    pbBuilder.setReplicaSelection(Common.ReplicaSelection.CLOSEST_REPLICA);
+    Client.ScanTokenPB scanTokenPB = pbBuilder.build();
+    final byte[] serializedToken = KuduScanToken.serialize(scanTokenPB);
+
+    // Deserialize the scan token into a scanner, and make sure it is using
+    // 'CLOSEST_REPLICA' selection policy.
+    KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, syncClient);
+    assertEquals(ReplicaSelection.CLOSEST_REPLICA, scanner.getReplicaSelection());
+    assertEquals(9, countRowsInScan(scanner));
+  }
+
+  @Test(timeout = 100000)
   public void testReadAtSnapshotNoTimestamp() throws Exception {
     // Perform scan in READ_AT_SNAPSHOT mode with no snapshot timestamp
     // specified. Verify that the scanner timestamp is set from the tablet

http://git-wip-us.apache.org/repos/asf/kudu/blob/9269e0ab/src/kudu/client/client.proto
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index 37e57c9..0cab818 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -97,6 +97,10 @@ message ScanTokenPB {
   // This is a hint, not a requirement: the server may send
   // arbitrarily fewer or more bytes than requested.
   optional uint32 batch_size_bytes = 15;
+
+  // The replica selection policy for the scan request.
+  // See common.proto for further information about replica selections.
+  optional ReplicaSelection replica_selection = 16 [default = LEADER_ONLY];
 }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9269e0ab/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index cfd7900..c156ecf 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -231,6 +231,16 @@ enum OrderMode {
   ORDERED = 2;
 }
 
+// Policy with which to choose among multiple replicas.
+enum ReplicaSelection {
+  UNKNOWN_REPLICA_SELECTION = 0;
+  // Select the LEADER replica.
+  LEADER_ONLY = 1;
+  // Select the closest replica to the client, or a random one if all replicas
+  // are equidistant.
+  CLOSEST_REPLICA = 2;
+}
+
 // The serialized format of a Kudu table partition schema.
 message PartitionSchemaPB {