You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/03/11 04:31:17 UTC

[1/2] incubator-kudu git commit: design-docs: add old multi-master heartbeating doc

Repository: incubator-kudu
Updated Branches:
  refs/heads/master a4506028a -> 0157ef606


design-docs: add old multi-master heartbeating doc

This is Alex's old multi-master heartbeating doc, imported from gdocs and
rewritten for markdown. I'm importing it to serve as context for a new
multi-master doc that I'm writing.

For those who have permission, the old gdoc can be found here:
https://docs.google.com/document/d/10qQwSLYV8Zbjr4BiT1MZU5G8e6h-5ONOR1HDQEqdZGs/edit

Change-Id: I21dd3a859326f9886600022a0c4209831679b00f
Reviewed-on: http://gerrit.cloudera.org:8080/2495
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/ce28d261
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/ce28d261
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/ce28d261

Branch: refs/heads/master
Commit: ce28d2616201107080c0dbd0799a721fe364d228
Parents: a450602
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Mar 8 18:51:32 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Mar 10 22:55:54 2016 +0000

----------------------------------------------------------------------
 docs/design-docs/README.md                      |   3 +-
 .../old-multi-master-heartbeating.md            | 163 +++++++++++++++++++
 2 files changed, 165 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ce28d261/docs/design-docs/README.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/README.md b/docs/design-docs/README.md
index b52063c..5ecc549 100644
--- a/docs/design-docs/README.md
+++ b/docs/design-docs/README.md
@@ -22,7 +22,7 @@ made.
 
 | Document | Component(s) | Discussion |
 | -------- | ------------ | ---------- |
-| [Scan Optimization & Partition Pruning](scan-optimization-partition-pruning.md) | Client, Tablet | [gerrit](http://gerrit.cloudera.org:8080/#/c/2149/) |
+| [Scan optimization and partition pruning](scan-optimization-partition-pruning.md) | Client, Tablet | [gerrit](http://gerrit.cloudera.org:8080/2149) |
 | [CFile format](cfile.md) | Tablet | N/A |
 | [Codegen API and impl. details](codegen.md) | Server | N/A |
 | [Consensus design](consensus.md) | Consensus | N/A |
@@ -34,3 +34,4 @@ made.
 | [Schema change design](schema-change.md) | Master, Tablet | N/A |
 | [Maintenance operation scheduling](triggering-maintenance-ops.md) | Master, Tablet Server | N/A |
 | [C++ client design and impl. details](cpp-client.md) | Client | N/A |
+| [(old) Heartbeating between tservers and multiple masters](old-multi-master-heartbeating.md) | Master | [gerrit](http://gerrit.cloudera.org:8080/2495) |

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ce28d261/docs/design-docs/old-multi-master-heartbeating.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/old-multi-master-heartbeating.md b/docs/design-docs/old-multi-master-heartbeating.md
new file mode 100644
index 0000000..0268b0e
--- /dev/null
+++ b/docs/design-docs/old-multi-master-heartbeating.md
@@ -0,0 +1,163 @@
+<!---
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# TabletServer support for multiple masters
+
+## Context
+
+This document was written on November 10th, 2014. The design was never
+implemented, but may serve as a useful starting point for future
+multi-master design discussions.
+
+## Existing TS to single master flow
+
+Currently a tablet server is provided with an address of a single
+master. The tablet server then periodically (default period: 1 second)
+sends heartbeats to the master. The heartbeats contain the tablet
+server’s node instance, and (optionally) the tablet server’s
+registration (hosts and ports) and/or a the server’s tablet report.
+If a heartbeat can’t be delivered to the master at a scheduled time,
+the tablet server waits, and tries again during the next schedule
+time.
+
+### Heartbeats and in-memory state
+
+When the single master receives a heartbeat, it:
+
+1. Checks if the HeartBeat contains TS registration. If there’s
+   registration, update a mapping of tablet server UUID to TS
+   registration. (This is in-memory only)
+2. If HeartBeat contains a tablet report, then the Master updates the
+   SysTables/SysTablets entries as needed based on the tablet report (the
+   hard state is updated).
+3. If a heart-beat came from a TS that has no entry in the in-memory
+   state (the UUID to TS registration mapping is empty for that TS), then
+   the master requests the TS to (re-)send its registration and a full
+   tablet report. **This is needed in order to re-build the in-memory
+   state on a master server that has freshly come up.**
+
+In-all, the in-memory state (encapsulated in ts_manager.cc/.h) is not
+in critical path of any lookup operation. **However, it plays a role
+in the table creation operations**: getting the current number of
+tablet servers that are up (to ensure that tablets can be created),
+assigning replicas to tablets, and modifying tablets on the tablet
+server (pending tasks for create table, delete table, alter table,
+etc…).
+
+#### Issue: CreateTable() unavailability on a “fresh” master
+
+As a result, when a single master is restarted, but before it has
+received the heartbeats from N servers (where N is the replication
+factor), CreateTable() call will fail (as we don’t know whether or not
+we have enough TS replicas up to execute the request). Additionally,
+until we’ve received heartbeats from enough tablet servers to be able
+to assign all tablets, background tablet alteration tasks may remain
+in PENDING state for a longer period of time (this, however, is per
+the contract for those operations).
+
+## Design alternatives
+
+### Don’t require a minimum server count for CreateTable()
+* Don’t refuse CreateTable() requests when the minimum number of
+  servers is not available. As is, CreateTable() can return without
+  having actually created the table: in this case, the tasks would
+  remain as pending until the requisite number of tablet servers have
+  come up.
+
+### Send heartbeats to all leaders
+* After failover, immediate ability to do things that require
+  knowledge of TS liveness:
+  * Load balancing
+  * Direct clients to up-to-date tablet quorum leader
+* Detect connectivity issues between TS / Master Followers before failover
+* Disadvantage: Tablet Server has to keep separate thread and state
+  for each master server for heart-beating. This is not that bad, we can
+  have multiple Heartbeater instances (1 per Master per TS)
+
+### Send heartbeats only to the master, use consensus to replicate.
+* Only send to one server
+  * If timeout / server down, try next in round-robin fashion
+  * Follower Masters redirect to leader
+  * Retrying heartbeats is straightforward, there is no case where we
+    “give up”.
+* Less traffic between TS and Master
+* Disadvantage: Logic required to “follow the leader”
+* Disadvantage: Either exists a period after failover for which the
+  follow does not know which servers host which tablets, thus leading
+  to less efficient routing, or need to replicate every heartbeat via
+  consensus (this would be slow, especially if logging). Or we have to
+  only replicate “snapshots” of the soft state
+
+## Handling multiple masters (“heartbeat to Master quorum” design)
+
+In order to handle multiple masters, tablet servers must support
+initialization with multiple master addresses, the ability to
+determine which master is the leader (much like this is currently done
+on the C++ client), and to be able to send heartbeats to non-leader
+masters (to address the “CreateTable() unavailability on fresh master”
+issue above). Tablet servers will need to maintain per-master state,
+indicating whether that master is currently a leader, and whether they
+need to (re-)send their registration (or in the case of the leader, a
+full tablet report) to that master.
+
+Master server must be changed to allow non-leader masters to support
+handling heartbeats from tablet servers, applying the heartbeat
+information only to the in-memory state (i.e., disregarding full
+tablet reports) only and not to the CatalogManager (only process
+routine heartbeats and TS registration, updating the mapping of live
+tablet servers). Authoritative information (the (table, key) ->
+tablet, and tablet -> TS mapping) that is in the hot-path of all
+clients requests is served from hard state, which is replicated via
+the master’s consensus quorum. Client queries (and full tablet reports
+from tablet servers) will still be handled only by the leader master.
+
+The heartbeat response from a master server will also indicate whether
+or not that master server is the leader. The TS, upon receiving a
+response saying a master server is not the leader from the previous
+leader, will determine the new leader (just as it does during the
+initialization routine) and send full tablet reports (if required) to
+that leader.
+
+## Summary of supported remote functionality, by role
+
+| RPC | Leader Master | Follower Master |
+| --- | ------------- | --------------- |
+| Ping(),<br/>TSHeartbeat() | Yes | Yes |
+| ListTables(),<br/>GetTableLocations(),<br/>GetTabletLocations(),<br/>GetTableSchema(),<br/><br/>CreateTable(),<br/>DeleteTable(),<br/>AlterTable() | Yes | **No**<br/><br/>The _List* and Get* calls need cache invalidation upon Update() to work properly on a follower, unless we disable the RWC cache._ |
+| ListTabletServers(),<br/>ListMasters(),<br/>GetMasterRegistration() | Yes | Yes |
+
+# Deliverables
+
+1. Extract any code that could be shared between the client, master,
+   and tablet server. Namely, this would be the code that handles server
+   registration, finding the quorum leader. Presently we have separate
+   data structures for the tablet server and master registration, these
+   could be unified as “server registration”, allowing code re-use in
+   places where it makes sense (provided the newly added abstractions do
+   not, by themselves, increase complexity). **Status**: DONE
+2. Support initializing a tablet server with multiple masters,
+   heartbeating to the leader master, and handling leader master
+   failures. NOTE: this doesn’t address the CreateTable() issue
+   above. **Status**: DONE.
+  1. Test 1: start a cluster, initialize a client, create a table,
+     change the leader master, and verify that we can perform a scan on
+     the tablet immediately after the new leader master is started.
+  2. Test 2: start a cluster, initialize a client, create table A,
+     change the leader master, sleep for a period sufficient for the
+     tablet servers to send their heartbeats to the master, and verify
+     that we can delete table A and create table B.
+3. Address the CreateTable() issue above: support sending heartbeats
+   to all of the masters (not just the leader).
+  1. Test: like “Test 2” above, but without waiting for the heartbeats
+     to go through.


[2/2] incubator-kudu git commit: tablet: avoid opening CFile writers for empty flushes

Posted by ad...@apache.org.
tablet: avoid opening CFile writers for empty flushes

I was looking at some failures of alter_table-randomized-test and
noticed that bootstrap is very slow in this test case. One of the
reasons is that the test triggers a lot of flushes of empty rowsets
as it replays alter tables. In one failed test run, there were 1395
instances of 'all input rows were GCed!'. These empty flushes were
wasting some IO to open cfile writers, write headers, and then delete
them.

This patch refactors some common code in the flush path between
AlterTable and normal flushes, and adds a short circuit for the case
of flushing an empty MRS.

Change-Id: I629f6a283f9963c0bb8e9be72d6aa4210f0aec72
Reviewed-on: http://gerrit.cloudera.org:8080/2441
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/0157ef60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/0157ef60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/0157ef60

Branch: refs/heads/master
Commit: 0157ef606252261683056e471e1a3506e3bfc0b3
Parents: ce28d26
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Mar 3 13:41:31 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Mar 11 03:30:50 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/alter_table-test.cc |  2 +-
 src/kudu/tablet/tablet.cc                      | 94 ++++++++++-----------
 src/kudu/tablet/tablet.h                       |  6 ++
 3 files changed, 50 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0157ef60/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 33aef92..60aa86d 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -674,7 +674,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterUpdatingRemovedColumn) {
   ASSERT_EQ("Dumping tablet:\n"
             "---------------------------\n"
             "MRS memrowset:\n"
-            "RowSet RowSet(1):\n"
+            "RowSet RowSet(0):\n"
             "(int32 c0=0, int32 c2=12345) Undos: [@2(DELETE)] Redos: []",
             JoinStrings(rows, "\n"));
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0157ef60/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index ad01767..a5b08f4 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -700,6 +700,15 @@ Status Tablet::FlushInternal(const RowSetsInCompaction& input,
   uint64_t start_insert_count = old_ms->debug_insert_count();
   int64_t mrs_being_flushed = old_ms->mrs_id();
 
+  if (old_ms->empty()) {
+    // If we're flushing an empty RowSet, we can short circuit here rather than
+    // waiting until the check at the end of DoCompactionAndFlush(). This avoids
+    // the need to create cfiles and write their headers only to later delete
+    // them.
+    LOG(INFO) << "MemRowSet was empty: no flush needed.";
+    return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
+  }
+
   if (flush_hooks_) {
     RETURN_NOT_OK_PREPEND(flush_hooks_->PostSwapNewMemRowSet(),
                           "PostSwapNewMemRowSet hook failed");
@@ -749,52 +758,33 @@ Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
   // in-place.
   boost::lock_guard<Semaphore> lock(rowsets_flush_sem_);
 
-  RowSetsInCompaction input;
-  shared_ptr<MemRowSet> old_ms;
-  {
-    // If the current version >= new version, there is nothing to do.
-    bool same_schema = schema()->Equals(*tx_state->schema());
-    if (metadata_->schema_version() >= tx_state->schema_version()) {
-      LOG_WITH_PREFIX(INFO) << "Already running schema version " << metadata_->schema_version()
-                            << " got alter request for version " << tx_state->schema_version();
-      return Status::OK();
-    }
-
-    LOG_WITH_PREFIX(INFO) << "Alter schema from " << schema()->ToString()
-                          << " version " << metadata_->schema_version()
-                          << " to " << tx_state->schema()->ToString()
-                          << " version " << tx_state->schema_version();
-    DCHECK(schema_lock_.is_locked());
-    metadata_->SetSchema(*tx_state->schema(), tx_state->schema_version());
-    if (tx_state->has_new_table_name()) {
-      metadata_->SetTableName(tx_state->new_table_name());
-      if (metric_entity_) {
-        metric_entity_->SetAttribute("table_name", tx_state->new_table_name());
-      }
-    }
+  // If the current version >= new version, there is nothing to do.
+  bool same_schema = schema()->Equals(*tx_state->schema());
+  if (metadata_->schema_version() >= tx_state->schema_version()) {
+    LOG_WITH_PREFIX(INFO) << "Already running schema version " << metadata_->schema_version()
+                          << " got alter request for version " << tx_state->schema_version();
+    return Status::OK();
+  }
 
-    // If the current schema and the new one are equal, there is nothing to do.
-    if (same_schema) {
-      return metadata_->Flush();
+  LOG_WITH_PREFIX(INFO) << "Alter schema from " << schema()->ToString()
+                        << " version " << metadata_->schema_version()
+                        << " to " << tx_state->schema()->ToString()
+                        << " version " << tx_state->schema_version();
+  DCHECK(schema_lock_.is_locked());
+  metadata_->SetSchema(*tx_state->schema(), tx_state->schema_version());
+  if (tx_state->has_new_table_name()) {
+    metadata_->SetTableName(tx_state->new_table_name());
+    if (metric_entity_) {
+      metric_entity_->SetAttribute("table_name", tx_state->new_table_name());
     }
   }
 
-
-  // Replace the MemRowSet
-  {
-    boost::lock_guard<rw_spinlock> lock(component_lock_);
-    RETURN_NOT_OK(ReplaceMemRowSetUnlocked(&input, &old_ms));
+  // If the current schema and the new one are equal, there is nothing to do.
+  if (same_schema) {
+    return metadata_->Flush();
   }
 
-  // TODO(KUDU-915): ideally we would release the schema_lock here so that
-  // we don't block access to the tablet while we flush the MRS.
-  // However, doing so opens up some subtle issues with the ordering of
-  // the alter's COMMIT message against the COMMIT messages of other
-  // writes. A "big hammer" fix has been applied here to hold the lock
-  // all the way until the COMMIT message has been appended to the WAL.
-
-  // Flush the old MemRowSet
-  return FlushInternal(input, old_ms);
+  return FlushUnlocked();
 }
 
 Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
@@ -1213,17 +1203,7 @@ Status Tablet::DoCompactionOrFlush(const RowSetsInCompaction &input, int64_t mrs
   if (gced_all_input) {
     LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows "
                           << "were GCed!)  Removing all input rowsets.";
-
-    // Write out the new Tablet Metadata and remove old rowsets.
-    // TODO: Consensus catch-up may want to preserve the compaction inputs.
-    RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(),
-                                        RowSetMetadataVector(),
-                                        mrs_being_flushed),
-                          "Failed to flush new tablet metadata");
-
-    AtomicSwapRowSets(input.rowsets(), RowSetVector());
-
-    return Status::OK();
+    return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
   }
 
   // The RollingDiskRowSet writer wrote out one or more RowSets as the
@@ -1383,6 +1363,18 @@ Status Tablet::DoCompactionOrFlush(const RowSetsInCompaction &input, int64_t mrs
   return Status::OK();
 }
 
+Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
+                                            int mrs_being_flushed) {
+  // Write out the new Tablet Metadata and remove old rowsets.
+  RETURN_NOT_OK_PREPEND(FlushMetadata(rowsets,
+                                      RowSetMetadataVector(),
+                                      mrs_being_flushed),
+                        "Failed to flush new tablet metadata");
+
+  AtomicSwapRowSets(rowsets, RowSetVector());
+  return Status::OK();
+}
+
 Status Tablet::Compact(CompactFlags flags) {
   CHECK_EQ(state_, kOpen);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0157ef60/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 0d2a8e1..be5e5de 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -418,6 +418,12 @@ class Tablet {
   Status DoCompactionOrFlush(const RowSetsInCompaction &input,
                              int64_t mrs_being_flushed);
 
+  // Handle the case in which a compaction or flush yielded no output rows.
+  // In this case, we just need to remove the rowsets in 'rowsets' from the
+  // metadata and flush it.
+  Status HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
+                                      int mrs_being_flushed);
+
   Status FlushMetadata(const RowSetVector& to_remove,
                        const RowSetMetadataVector& to_add,
                        int64_t mrs_being_flushed);