You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/01/13 07:51:09 UTC

[kudu] branch master updated: KUDU-1945 Auto-Incrementing Column

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ac6578d9 KUDU-1945 Auto-Incrementing Column
6ac6578d9 is described below

commit 6ac6578d9763c3e7856b313f50aa117acef6299c
Author: Abhishek Chennaka <ac...@cloudera.com>
AuthorDate: Wed Sep 28 09:16:11 2022 -0400

    KUDU-1945 Auto-Incrementing Column
    
    This patch adds a new column specification named auto_incrementing.
    The ColumnSchema of this new column is INT64 and not UINT64 as
    impala doesn't support UINT64. These columns are populated on the
    server side with a monotonically increasing counter. This counter
    is local to every tablet i.e. each tablet has a separate auto
    incrementing counter. This is a step towards having tables with
    non unique primary keys or in case of tables with just one tablet,
    a table wide unique key.
    Upon receiving a write request the leader replica:
    1. Fills in the replicate message with the auto incrementing counter
       value for the first write op.
    2. Populates the auto incrementing key into the rows being inserted
       during the prepare phase.
    3. Sends out the replicate message with the auto incrementing counter
       and the original write request. The followers perform the same
       set of steps to populate the auto incrementing column.
    It also adds the c++ client side changes needed for basic writes
    and reads of this column to perform end-to-end tests.
    
    Change-Id: I1dbde9095da78f6d1bd00adcc0a6e7dd63082bbc
    Reviewed-on: http://gerrit.cloudera.org:8080/19097
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/client/client-test.cc                     | 204 +++++++++++++++++++++
 src/kudu/client/client-unittest.cc                 |   3 +-
 src/kudu/client/scan_configuration.cc              |   1 +
 src/kudu/client/schema-internal.h                  |   2 +
 src/kudu/client/schema.cc                          |  34 +++-
 src/kudu/client/schema.h                           |  14 ++
 src/kudu/client/session-internal.cc                |   5 +-
 src/kudu/codegen/codegen-test.cc                   |   8 +-
 src/kudu/common/column_predicate-test.cc           |   3 +-
 src/kudu/common/common.proto                       |   3 +
 src/kudu/common/generic_iterators-test.cc          |   1 +
 src/kudu/common/partial_row-test.cc                |   4 +-
 src/kudu/common/partition-test.cc                  |   4 +-
 src/kudu/common/row_operations.cc                  |  63 +++++--
 src/kudu/common/row_operations.h                   |   9 +-
 src/kudu/common/schema-test.cc                     |  22 ++-
 src/kudu/common/schema.cc                          |  58 +++++-
 src/kudu/common/schema.h                           |  43 ++++-
 src/kudu/common/wire_protocol-test.cc              |   8 +-
 src/kudu/common/wire_protocol.cc                   |   6 +-
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 .../integration-tests/auto_incrementing-itest.cc   | 168 +++++++++++++++++
 src/kudu/tablet/CMakeLists.txt                     |   1 +
 src/kudu/tablet/all_types-scan-correctness-test.cc |  10 +-
 src/kudu/tablet/cfile_set-test.cc                  |   2 +-
 src/kudu/tablet/diskrowset-test.cc                 |   3 +-
 src/kudu/tablet/ops/write_op.cc                    |  21 ++-
 src/kudu/tablet/tablet-decoder-eval-test.cc        |   4 +-
 src/kudu/tablet/tablet-test-util.h                 |   3 +-
 src/kudu/tablet/tablet-test.cc                     |   3 +-
 src/kudu/tablet/tablet.cc                          |   6 +-
 src/kudu/tablet/tablet.h                           |  14 +-
 src/kudu/tablet/tablet_auto_incrementing-test.cc   | 118 ++++++++++++
 src/kudu/tools/kudu-tool-test.cc                   |   6 +-
 .../tserver/tablet_server_authorization-test.cc    |   7 +-
 src/kudu/tserver/tserver.proto                     |   8 +
 36 files changed, 797 insertions(+), 73 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index b9f9c2226..8723df23d 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -9837,5 +9837,209 @@ TEST_F(ReplicationFactorLimitsTest, MaxReplicationFactor) {
   }
 }
 
+class ClientTestAutoIncrementingColumn : public ClientTest {
+ public:
+  void SetUp() override {
+    // TODO:achennaka Enable Feature flag here once implemented
+
+    KuduTest::SetUp();
+
+    // Start minicluster and wait for tablet servers to connect to master.
+    InternalMiniClusterOptions options;
+    options.num_tablet_servers = 3;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(options)));
+    ASSERT_OK(cluster_->StartSync());
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  }
+};
+
+TEST_F(ClientTestAutoIncrementingColumn, ReadAndWrite) {
+  const string kTableName = "table_with_auto_incrementing_column";
+  KuduSchemaBuilder b;
+  // TODO(Marton): Once the NonUnique column Spec is in place
+  // update the column specs below to match the implementation
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT64)
+      ->NotNull()->AutoIncrementing();
+  ASSERT_OK(b.Build(&schema_));
+
+  // Create a table with a couple of range partitions
+  int lower_bound = 0;
+  int mid_bound = 10;
+  int upper_bound = 20;
+  unique_ptr<KuduPartialRow> lower0(schema_.NewRow());
+  unique_ptr<KuduPartialRow> upper0(schema_.NewRow());
+  unique_ptr<KuduPartialRow> lower1(schema_.NewRow());
+  unique_ptr<KuduPartialRow> upper1(schema_.NewRow());
+
+  ASSERT_OK(lower0->SetInt32("key", lower_bound));
+  ASSERT_OK(upper0->SetInt32("key", mid_bound));
+  ASSERT_OK(lower1->SetInt32("key", mid_bound));
+  ASSERT_OK(upper1->SetInt32("key", upper_bound));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema_)
+            .set_range_partition_columns({"key"})
+            .add_range_partition(lower0.release(), upper0.release())
+            .add_range_partition(lower1.release(), upper1.release())
+            .num_replicas(3)
+            .Create());
+
+  // Write into these two partitions without specifying values for
+  // auto-incrementing column.
+  shared_ptr<KuduSession> session = client_->NewSession();
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+  static constexpr auto kNumRows = 20;
+  for (int i = 0; i < kNumRows; i++) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    ASSERT_OK(row->SetInt32("key", i));
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+  FlushSessionOrDie(session);
+
+  // Read back the rows and confirm the values of auto-incrementing column set
+  // correctly for each of the partitions in different scan modes.
+  for (const auto mode: {KuduClient::LEADER_ONLY, KuduClient::CLOSEST_REPLICA,
+                         KuduClient::FIRST_REPLICA}) {
+    vector<string> rows;
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetSelection(mode));
+    ASSERT_OK(ScanToStrings(&scanner, &rows));
+    ASSERT_EQ(kNumRows, rows.size());
+    for (int i = 0; i < rows.size(); i++) {
+      ASSERT_EQ(Substitute("(int32 key=$0, int64 auto_incrementing_id=$1)", i,
+                           (i % 10) + 1), rows[i]);
+    }
+  }
+}
+
+
+TEST_F(ClientTestAutoIncrementingColumn, ConcurrentWrites) {
+  const string kTableName = "concurrent_writes_auto_incrementing_column";
+  KuduSchemaBuilder b;
+  // TODO(Marton): Once the NonUnique column Spec is in place
+  // update the column specs below to match the implementation
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT64)
+      ->NotNull()->AutoIncrementing();
+  ASSERT_OK(b.Build(&schema_));
+
+  static constexpr int num_clients = 8;
+  static constexpr int num_rows_per_client = 1000;
+
+  // Create a table with a single range partition
+  static constexpr int lower_bound = 0;
+  static constexpr int upper_bound = num_clients * num_rows_per_client;
+  unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+  unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+
+  ASSERT_OK(lower->SetInt32("key", lower_bound));
+  ASSERT_OK(upper->SetInt32("key", upper_bound));
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema_)
+            .set_range_partition_columns({"key"})
+            .add_range_partition(lower.release(), upper.release())
+            .num_replicas(3)
+            .Create());
+
+  // Write into the table with eight clients concurrently without specifying values
+  // for the auto-incrementing column.
+  vector<shared_ptr<KuduClient>> clients;
+  CountDownLatch client_latch(num_clients);
+  vector<thread> threads;
+  for (int i = 0; i < num_clients; i++) {
+    threads.emplace_back([&, i] {
+      shared_ptr<KuduClient> client;
+      ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+                .Build(&client));
+      clients.emplace_back(client);
+      shared_ptr<KuduSession> session = client->NewSession();
+      shared_ptr<KuduTable> table;
+      ASSERT_OK(client->OpenTable(kTableName, &table));
+      for (int j = num_rows_per_client * i; j < num_rows_per_client * (i + 1); j++) {
+        unique_ptr<KuduInsert> insert(table->NewInsert());
+        KuduPartialRow *row = insert->mutable_row();
+        ASSERT_OK(row->SetInt32("key", j));
+        ASSERT_OK(session->Apply(insert.release()));
+      }
+      FlushSessionOrDie(session);
+      client_latch.CountDown();
+    });
+  }
+  client_latch.Wait();
+  for (int i = 0; i < threads.size(); i++) {
+    threads[i].join();
+  }
+  // Read back the rows and confirm the values of auto-incrementing column set
+  // correctly for each of the partitions in different scan modes.
+  static constexpr auto kNumRows = num_clients * num_rows_per_client;
+  vector<vector<string>> rows;
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+  for (const auto mode: {KuduClient::LEADER_ONLY, KuduClient::CLOSEST_REPLICA,
+                         KuduClient::FIRST_REPLICA}) {
+    vector<string> row;
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetSelection(mode));
+    ASSERT_OK(ScanToStrings(&scanner, &row));
+    ASSERT_EQ(kNumRows, row.size());
+    rows.push_back(row);
+  }
+  for (int i = 0; i < 2; i++) {
+    SCOPED_TRACE(i);
+    ASSERT_TRUE(rows[i] == rows[i + 1]);
+  }
+}
+
+TEST_F(ClientTestAutoIncrementingColumn, Negatives) {
+  // TODO(Marton): Once the NonUnique column Spec is in place
+  // update the column specs below to match the implementation
+  {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT32)
+        ->NotNull()->AutoIncrementing();
+    Status s = b.Build(&schema_);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column should be of type INT64", s.ToString());
+  }
+
+  {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT64)
+        ->Nullable()->AutoIncrementing();
+    Status s = b.Build(&schema_);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column should not be nullable", s.ToString());
+  }
+
+  {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT64)
+        ->NotNull()->Default(KuduValue::FromInt(20))->AutoIncrementing();
+    Status s = b.Build(&schema_);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column cannot have a "
+                            "default value", s.ToString());
+  }
+
+  {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("auto_incrementing_id")->Type(KuduColumnSchema::INT64)
+        ->NotNull()->Immutable()->AutoIncrementing();
+    Status s = b.Build(&schema_);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ("Invalid argument: auto-incrementing column should not be immutable", s.ToString());
+  }
+}
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/client-unittest.cc b/src/kudu/client/client-unittest.cc
index 63fe3824f..a5c116e89 100644
--- a/src/kudu/client/client-unittest.cc
+++ b/src/kudu/client/client-unittest.cc
@@ -372,7 +372,8 @@ TEST(KuduColumnSchemaTest, TestEquals) {
 
   const int kDefaultOf7 = 7;
   KuduColumnSchema a32_dflt("a", KuduColumnSchema::INT32, /*is_nullable=*/false,
-                            /*is_immutable=*/false, /*default_value=*/&kDefaultOf7);
+                            /*is_immutable=*/false, /*is_auto_incrementing=*/false,
+                            /*default_value=*/&kDefaultOf7);
   ASSERT_NE(a32, a32_dflt);
 }
 
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index edf0921f9..f0b725e19 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -245,6 +245,7 @@ Status ScanConfiguration::AddIsDeletedColumn() {
                           IS_DELETED,
                           /*is_nullable=*/false,
                           /*is_immutable=*/false,
+                          /*is_auto_incrementing=*/false,
                           &read_default);
   cols.emplace_back(std::move(is_deleted));
 
diff --git a/src/kudu/client/schema-internal.h b/src/kudu/client/schema-internal.h
index 0b8357545..c892569a4 100644
--- a/src/kudu/client/schema-internal.h
+++ b/src/kudu/client/schema-internal.h
@@ -61,6 +61,7 @@ class KuduColumnSpec::Data {
   explicit Data(std::string name)
       : name(std::move(name)),
         primary_key(false),
+        auto_incrementing(false),
         remove_default(false) {
   }
 
@@ -83,6 +84,7 @@ class KuduColumnSpec::Data {
   std::optional<bool> immutable;
   bool primary_key;
   std::optional<KuduValue*> default_val;  // Owned.
+  bool auto_incrementing;
   bool remove_default;                      // For ALTER
   std::optional<std::string> rename_to;   // For ALTER
   std::optional<std::string> comment;
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index ae177d9cd..81f6888e0 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -338,6 +338,11 @@ KuduColumnSpec* KuduColumnSpec::PrimaryKey() {
   return this;
 }
 
+KuduColumnSpec* KuduColumnSpec::AutoIncrementing() {
+  data_->auto_incrementing = true;
+  return this;
+}
+
 KuduColumnSpec* KuduColumnSpec::NotNull() {
   data_->nullable = false;
   return this;
@@ -468,6 +473,27 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
   bool nullable = data_->nullable ? data_->nullable.value() : true;
   bool immutable = data_->immutable ? data_->immutable.value() : false;
 
+  if (data_->auto_incrementing) {
+    if (internal_type != kudu::INT64) {
+      return Status::InvalidArgument("auto-incrementing column should be of type INT64");
+    }
+    if (nullable) {
+      return Status::InvalidArgument("auto-incrementing column should not be nullable");
+    }
+    if (immutable) {
+      return Status::InvalidArgument("auto-incrementing column should not be immutable");
+    }
+    if (data_->default_val) {
+      return Status::InvalidArgument("auto-incrementing column cannot have a "
+                                     "default value");
+    }
+    // TODO(Marton): Uncomment once the client patch promotes the
+    // auto increment column to primary key
+    //if (!data_->primary_key) {
+    //   return Status::InvalidArgument("auto-incrementing column is not set as primary key "
+    //                                  "column");
+    //}
+  }
   void* default_val = nullptr;
   // TODO(unknown): distinguish between DEFAULT NULL and no default?
   if (data_->default_val) {
@@ -488,7 +514,7 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
   *col = KuduColumnSchema(data_->name, data_->type.value(), nullable,
-                          immutable, default_val,
+                          immutable, data_->auto_incrementing, default_val,
                           KuduColumnStorageAttributes(encoding, compression, block_size),
                           type_attrs,
                           data_->comment ? data_->comment.value() : "");
@@ -754,6 +780,7 @@ KuduColumnSchema::KuduColumnSchema(const string &name,
                                    DataType type,
                                    bool is_nullable,
                                    bool is_immutable,
+                                   bool is_auto_incrementing,
                                    const void* default_value,
                                    const KuduColumnStorageAttributes& storage_attributes,
                                    const KuduColumnTypeAttributes& type_attributes,
@@ -769,6 +796,7 @@ KuduColumnSchema::KuduColumnSchema(const string &name,
   col_ = new ColumnSchema(name, ToInternalDataType(type, type_attributes),
                           is_nullable,
                           is_immutable,
+                          is_auto_incrementing,
                           default_value, default_value, attr_private,
                           type_attr_private, comment);
 }
@@ -930,8 +958,8 @@ KuduColumnSchema KuduSchema::Column(size_t idx) const {
   KuduColumnTypeAttributes type_attrs(col.type_attributes().precision, col.type_attributes().scale,
                                       col.type_attributes().length);
   return KuduColumnSchema(col.name(), FromInternalDataType(col.type_info()->type()),
-                          col.is_nullable(), col.is_immutable(), col.read_default_value(),
-                          attrs, type_attrs, col.comment());
+                          col.is_nullable(), col.is_immutable(), col.is_auto_incrementing(),
+                          col.read_default_value(), attrs, type_attrs, col.comment());
 }
 
 bool KuduSchema::HasColumn(const std::string& col_name, KuduColumnSchema* col_schema) const {
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index d273a5787..8c15edc00 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -345,6 +345,7 @@ class KUDU_EXPORT KuduColumnSchema {
       DataType type,
       bool is_nullable = false,
       bool is_immutable = false,
+      bool is_auto_incrementing = false,
       const void* default_value = NULL, //NOLINT(modernize-use-nullptr)
       const KuduColumnStorageAttributes& storage_attributes = KuduColumnStorageAttributes(),
       const KuduColumnTypeAttributes& type_attributes = KuduColumnTypeAttributes(),
@@ -488,6 +489,19 @@ class KUDU_EXPORT KuduColumnSpec {
   /// @return Pointer to the modified object.
   KuduColumnSpec* PrimaryKey();
 
+  ///@{
+  /// Set the column to be auto-incrementing.
+  ///
+  /// Upon inserting rows to a table with an auto-incrementing column, values are
+  /// automatically assigned to field that are unique to the partition. This
+  /// makes such columns good candidates to include in a table's primary key.
+  ///
+  /// @note Column auto-incrementing may not be changed once a table is
+  /// created.
+  ///
+  /// @return Pointer to the modified object.
+  KuduColumnSpec* AutoIncrementing();
+
   /// Set the column to be not nullable.
   ///
   /// @note Column nullability may not be changed once a table is created.
diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc
index 0cb0be561..aec71592b 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -19,6 +19,7 @@
 
 #include <functional>
 #include <mutex>
+#include <type_traits>
 #include <utility>
 
 #include <glog/logging.h>
@@ -36,7 +37,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/messenger.h" // IWYU pragma: keep
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/logging.h"
 
@@ -351,7 +352,7 @@ Status CheckForNonNullableColumns(const KuduWriteOperation& op) {
   for (auto idx = 0; idx < num_columns; ++idx) {
     const ColumnSchema& col = schema->column(idx);
     if (!col.is_nullable() && !col.has_write_default() &&
-        !row.IsColumnSet(idx)) {
+        !row.IsColumnSet(idx) && !col.is_auto_incrementing()) {
       return Status::IllegalState(Substitute(
           "non-nullable column '$0' is not set", schema->column(idx).name()),
           KUDU_REDACT(op.ToString()));
diff --git a/src/kudu/codegen/codegen-test.cc b/src/kudu/codegen/codegen-test.cc
index eb2a840f0..8d5bc9d31 100644
--- a/src/kudu/codegen/codegen-test.cc
+++ b/src/kudu/codegen/codegen-test.cc
@@ -81,10 +81,10 @@ class CodegenTest : public KuduTest {
     base_ = SchemaBuilder(base_).Build(); // add IDs
 
     // Create an extended default schema
-    cols.emplace_back("int32-R ",  INT32, false, false, kI32R, nullptr);
-    cols.emplace_back("int32-RW",  INT32, false, false, kI32R, kI32W);
-    cols.emplace_back("str32-R ", STRING, false, false, kStrR, nullptr);
-    cols.emplace_back("str32-RW", STRING, false, false, kStrR, kStrW);
+    cols.emplace_back("int32-R ",  INT32, false, false, false, kI32R, nullptr);
+    cols.emplace_back("int32-RW",  INT32, false, false, false, kI32R, kI32W);
+    cols.emplace_back("str32-R ", STRING, false, false, false, kStrR, nullptr);
+    cols.emplace_back("str32-RW", STRING, false, false, false, kStrR, kStrW);
     defaults_.Reset(cols, 1);
     defaults_ = SchemaBuilder(defaults_).Build(); // add IDs
 
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index 18de259a8..dc7ca2ad1 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -1463,7 +1463,8 @@ TEST_F(TestColumnPredicate, TestEquals) {
 
   const int kDefaultOf3 = 3;
   ColumnSchema c1dflt("c1", INT32, /*is_nullable=*/false,
-                      /*is_immutable=*/false, /*read_default=*/&kDefaultOf3);
+                      /*is_immutable=*/false, /*is_auto_incrementing=*/false,
+                      /*read_default=*/&kDefaultOf3);
   ASSERT_NE(ColumnPredicate::None(c1), ColumnPredicate::None(c1dflt));
 }
 
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 2779d8566..5e5005583 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -139,6 +139,9 @@ message ColumnSchemaPB {
   optional string comment = 12;
 
   optional bool immutable = 13 [default = false];
+
+  // Whether the column is auto-incrementing.
+  optional bool is_auto_incrementing = 14 [default = false];
 }
 
 message ColumnSchemaDeltaPB {
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 4cc1f4b67..9864f892f 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -88,6 +88,7 @@ static const Schema kIntSchemaWithVCol({ ColumnSchema("val", INT64),
                                          ColumnSchema("is_deleted", IS_DELETED,
                                                       /*is_nullable=*/false,
                                                       /*is_immutable=*/false,
+                                                      /*is_auto_incrementing=*/false,
                                                       /*read_default=*/&kIsDeletedReadDefault) },
                                        /*key_columns=*/1);
 
diff --git a/src/kudu/common/partial_row-test.cc b/src/kudu/common/partial_row-test.cc
index 03222bda3..ccb9e181b 100644
--- a/src/kudu/common/partial_row-test.cc
+++ b/src/kudu/common/partial_row-test.cc
@@ -43,9 +43,9 @@ class PartialRowTest : public KuduTest {
                 ColumnSchema("uint64_val", UINT64),
                 ColumnSchema("string_val", STRING, true),
                 ColumnSchema("binary_val", BINARY, true),
-                ColumnSchema("decimal_val", DECIMAL32, true, false, nullptr, nullptr,
+                ColumnSchema("decimal_val", DECIMAL32, true, false, false, nullptr, nullptr,
                              ColumnStorageAttributes(), ColumnTypeAttributes(6, 2)),
-                ColumnSchema("varchar_val", VARCHAR, true, false, nullptr, nullptr,
+                ColumnSchema("varchar_val", VARCHAR, true, false, false, nullptr, nullptr,
                              ColumnStorageAttributes(), ColumnTypeAttributes(10)) },
               1) {
     SeedRandom();
diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index de40eaea0..760f35b84 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -876,9 +876,9 @@ TEST_F(PartitionTest, TestIncrementRangePartitionStringBounds) {
 }
 
 TEST_F(PartitionTest, TestVarcharRangePartitions) {
-  Schema schema({ ColumnSchema("c1", VARCHAR, false, false, nullptr, nullptr,
+  Schema schema({ ColumnSchema("c1", VARCHAR, false, false, false, nullptr, nullptr,
                                ColumnStorageAttributes(), ColumnTypeAttributes(10)),
-                  ColumnSchema("c2", VARCHAR, false, false, nullptr, nullptr,
+                  ColumnSchema("c2", VARCHAR, false, false, false, nullptr, nullptr,
                                ColumnStorageAttributes(), ColumnTypeAttributes(10)) },
                   { ColumnId(0), ColumnId(1) }, 2);
 
diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc
index 1ed07cdeb..43970b789 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -421,7 +421,8 @@ size_t RowOperationsPBDecoder::GetTabletColIdx(const ClientServerMapping& mappin
 
 Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row_storage,
                                                     const ClientServerMapping& mapping,
-                                                    DecodedRowOperation* op) {
+                                                    DecodedRowOperation* op,
+                                                    int64_t* auto_incrementing_counter) {
   const uint8_t* client_isset_map = nullptr;
   const uint8_t* client_null_map = nullptr;
 
@@ -451,6 +452,7 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
   // Now handle each of the columns passed by the user, replacing the defaults
   // from the prototype.
   Status row_status;
+  const auto auto_incrementing_col_idx = tablet_schema_->auto_incrementing_col_idx();
   for (size_t client_col_idx = 0;
        client_col_idx < client_schema_->num_columns();
        client_col_idx++) {
@@ -458,6 +460,7 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
     // ColumnSchema object since it has the most up-to-date default, nullability,
     // etc.
     size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
+    DCHECK_NE(tablet_col_idx, Schema::kColumnNotFound);
     const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
 
     bool isset = BitmapTest(client_isset_map, client_col_idx);
@@ -480,10 +483,31 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
             "NULL values not allowed for non-nullable column", col.ToString()));
         RETURN_NOT_OK(ReadColumnAndDiscard(col));
       }
+      if (PREDICT_FALSE(tablet_col_idx == auto_incrementing_col_idx)) {
+        static const Status err_field_incorrectly_set = Status::InvalidArgument(
+            "auto-incrementing column is incorrectly set");
+        op->SetFailureStatusOnce(err_field_incorrectly_set);
+        return err_field_incorrectly_set;
+      }
     } else {
-      // If the client didn't provide a value, then the column must either be nullable or
-      // have a default (which was already set in the prototype row).
-      if (PREDICT_FALSE(!(col.is_nullable() || col.has_write_default()))) {
+      // If the client didn't provide a value, check if it's an auto-incrementing
+      // field. If so, populate the field as appropriate.
+      if (tablet_col_idx == auto_incrementing_col_idx) {
+        if (*DCHECK_NOTNULL(auto_incrementing_counter) == INT64_MAX) {
+          static const Status err_max_value = Status::IllegalState("max auto-incrementing column "
+                                                                   "value reached");
+          op->SetFailureStatusOnce(err_max_value);
+          return err_max_value;
+        }
+        // We increment the auto incrementing counter at this point regardless of future failures
+        // in the op for simplicity. The auto-incrementing column key space is large enough to
+        // not run of values for any realistic workloads.
+        (*auto_incrementing_counter)++;
+        memcpy(tablet_row.mutable_cell_ptr(tablet_col_idx), auto_incrementing_counter, 8);
+        BitmapChange(tablet_isset_bitmap, client_col_idx, true);
+      } else if (PREDICT_FALSE(!(col.is_nullable() || col.has_write_default()))) {
+        // Otherwise, the column must either be nullable or have a default (which
+        // was already set in the prototype row).
         op->SetFailureStatusOnce(Status::InvalidArgument("No value provided for required column",
                                                          col.ToString()));
       }
@@ -685,7 +709,8 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
 }
 
 template <DecoderMode mode>
-Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops) {
+Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops,
+                                                int64_t* auto_incrementing_counter) {
   // TODO(todd): there's a bug here, in that if a client passes some column in
   // its schema that has been deleted on the server, it will fail even if the
   // client never actually specified any values for it.  For example, a DBA
@@ -715,7 +740,8 @@ Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops
     DecodedRowOperation op;
     op.type = type;
 
-    RETURN_NOT_OK(DecodeOp<mode>(type, prototype_row_storage, mapping, &op));
+    RETURN_NOT_OK(DecodeOp<mode>(type, prototype_row_storage, mapping, &op,
+                                 auto_incrementing_counter));
     ops->push_back(op);
   }
 
@@ -725,15 +751,25 @@ Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops
 template<>
 Status RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
     RowOperationsPB::Type type, const uint8_t* prototype_row_storage,
-    const ClientServerMapping& mapping, DecodedRowOperation* op) {
+    const ClientServerMapping& mapping, DecodedRowOperation* op,
+    int64_t* auto_incrementing_counter) {
   switch (type) {
     case RowOperationsPB::UNKNOWN:
       return Status::NotSupported("Unknown row operation type");
-    case RowOperationsPB::INSERT:
-    case RowOperationsPB::INSERT_IGNORE:
     case RowOperationsPB::UPSERT:
+      if (tablet_schema_->has_auto_incrementing()) {
+        return Status::NotSupported("Tables with auto-incrementing column do not support "
+                                    "UPSERT operations");
+      }
     case RowOperationsPB::UPSERT_IGNORE:
-      RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op));
+      if (tablet_schema_->has_auto_incrementing()) {
+        return Status::NotSupported("Tables with auto-incrementing column do not support "
+                                    "UPSERT_IGNORE operations");
+      }
+    case RowOperationsPB::INSERT:
+    case RowOperationsPB::INSERT_IGNORE:
+      RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
+                                         auto_incrementing_counter));
       break;
     case RowOperationsPB::UPDATE:
     case RowOperationsPB::UPDATE_IGNORE:
@@ -751,7 +787,8 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
 template<>
 Status RowOperationsPBDecoder::DecodeOp<DecoderMode::SPLIT_ROWS>(
     RowOperationsPB::Type type, const uint8_t* /*prototype_row_storage*/,
-    const ClientServerMapping& mapping, DecodedRowOperation* op) {
+    const ClientServerMapping& mapping, DecodedRowOperation* op,
+    int64_t* /*auto_incrementing_counter*/) {
   switch (type) {
     case RowOperationsPB::UNKNOWN:
       return Status::NotSupported("Unknown row operation type");
@@ -771,9 +808,9 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::SPLIT_ROWS>(
 
 template
 Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::SPLIT_ROWS>(
-    vector<DecodedRowOperation>* ops);
+    vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
 template
 Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::WRITE_OPS>(
-    vector<DecodedRowOperation>* ops);
+    vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
 
 } // namespace kudu
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index b69f3f7cb..f27a4afc6 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -122,7 +122,8 @@ class RowOperationsPBDecoder {
   ~RowOperationsPBDecoder();
 
   template <DecoderMode mode>
-  Status DecodeOperations(std::vector<DecodedRowOperation>* ops);
+  Status DecodeOperations(std::vector<DecodedRowOperation>* ops,
+                          int64_t* auto_incrementing_counter = nullptr);
 
  private:
   Status ReadOpType(RowOperationsPB::Type* type);
@@ -148,7 +149,8 @@ class RowOperationsPBDecoder {
 
   Status DecodeInsertOrUpsert(const uint8_t* prototype_row_storage,
                               const ClientServerMapping& mapping,
-                              DecodedRowOperation* op);
+                              DecodedRowOperation* op,
+                              int64_t* auto_incrementing_counter);
   //------------------------------------------------------------
   // Serialization/deserialization support
   //------------------------------------------------------------
@@ -165,7 +167,8 @@ class RowOperationsPBDecoder {
   // Returns an error if the type isn't allowed by the decoder mode.
   template <DecoderMode mode>
   Status DecodeOp(RowOperationsPB::Type type, const uint8_t* prototype_row_storage,
-                  const ClientServerMapping& mapping, DecodedRowOperation* op);
+                  const ClientServerMapping& mapping, DecodedRowOperation* op,
+                  int64_t* auto_incrementing_counter);
 
   const RowOperationsPB* const pb_;
   // If 'client_schema_' and 'tablet_schema_' are the same object, the mapping
diff --git a/src/kudu/common/schema-test.cc b/src/kudu/common/schema-test.cc
index f5a5b8bb1..b1f15ca88 100644
--- a/src/kudu/common/schema-test.cc
+++ b/src/kudu/common/schema-test.cc
@@ -232,13 +232,13 @@ TEST_P(ParameterizedSchemaTest, TestCopyAndMove) {
 // Test basic functionality of Schema definition with decimal columns
 TEST_F(TestSchema, TestSchemaWithDecimal) {
   ColumnSchema col1("key", STRING);
-  ColumnSchema col2("decimal32val", DECIMAL32, false, false,
+  ColumnSchema col2("decimal32val", DECIMAL32, false, false, false,
                     nullptr, nullptr, ColumnStorageAttributes(),
                     ColumnTypeAttributes(9, 4));
-  ColumnSchema col3("decimal64val", DECIMAL64, true, false,
+  ColumnSchema col3("decimal64val", DECIMAL64, true, false, false,
                     nullptr, nullptr, ColumnStorageAttributes(),
                     ColumnTypeAttributes(18, 10));
-  ColumnSchema col4("decimal128val", DECIMAL128, true, false,
+  ColumnSchema col4("decimal128val", DECIMAL128, true, false, false,
                     nullptr, nullptr, ColumnStorageAttributes(),
                     ColumnTypeAttributes(38, 2));
 
@@ -266,16 +266,16 @@ TEST_F(TestSchema, TestSchemaWithDecimal) {
 // Test Schema::Equals respects decimal column attributes
 TEST_F(TestSchema, TestSchemaEqualsWithDecimal) {
   ColumnSchema col1("key", STRING);
-  ColumnSchema col_18_10("decimal64val", DECIMAL64, true, false,
+  ColumnSchema col_18_10("decimal64val", DECIMAL64, true, false, false,
                          nullptr, nullptr, ColumnStorageAttributes(),
                          ColumnTypeAttributes(18, 10));
-  ColumnSchema col_18_9("decimal64val", DECIMAL64, true, false,
+  ColumnSchema col_18_9("decimal64val", DECIMAL64, true, false,false,
                         nullptr, nullptr, ColumnStorageAttributes(),
                         ColumnTypeAttributes(18, 9));
-  ColumnSchema col_17_10("decimal64val", DECIMAL64, true, false,
+  ColumnSchema col_17_10("decimal64val", DECIMAL64, true, false, false,
                          nullptr, nullptr, ColumnStorageAttributes(),
                          ColumnTypeAttributes(17, 10));
-  ColumnSchema col_17_9("decimal64val", DECIMAL64, true, false,
+  ColumnSchema col_17_9("decimal64val", DECIMAL64, true, false, false,
                         nullptr, nullptr, ColumnStorageAttributes(),
                         ColumnTypeAttributes(17, 9));
 
@@ -422,7 +422,7 @@ TEST_F(TestSchema, TestProjectMissingColumn) {
   Schema schema3({ ColumnSchema("val", UINT32), ColumnSchema("non_present", UINT32, true) }, 0);
   uint32_t default_value = 15;
   Schema schema4({ ColumnSchema("val", UINT32),
-                   ColumnSchema("non_present", UINT32, false, false, &default_value) },
+                   ColumnSchema("non_present", UINT32, false, false, false, &default_value) },
                  0);
 
   RowProjector row_projector(&schema1, &schema2);
@@ -492,7 +492,9 @@ TEST_F(TestSchema, TestGetMappedReadProjection) {
   const bool kReadDefault = false;
   Schema projection({ ColumnSchema("key", STRING),
                       ColumnSchema("deleted", IS_DELETED,
-                                   /*is_nullable=*/false, /*is_immutable=*/false,
+                                   /*is_nullable=*/false,
+                                   /*is_immutable=*/false,
+                                   /*is_auto_incrementing=*/false,
                                    /*read_default=*/&kReadDefault) },
                     1);
 
@@ -522,6 +524,7 @@ TEST_F(TestSchema, TestGetMappedReadProjection) {
                                          ColumnSchema("deleted", IS_DELETED,
                                                       /*is_nullable=*/true,
                                                       /*is_immutable=*/false,
+                                                      /*is_auto_incrementing=*/false,
                                                       /*read_default=*/&kReadDefault) },
                                        1);
   ASSERT_FALSE(s.ok());
@@ -532,6 +535,7 @@ TEST_F(TestSchema, TestGetMappedReadProjection) {
                                     ColumnSchema("deleted", IS_DELETED,
                                                  /*is_nullable=*/false,
                                                  /*is_immutable=*/false,
+                                                 /*is_auto_incrementing*/false,
                                                  /*read_default=*/nullptr) },
                                   1);
   ASSERT_FALSE(s.ok());
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index dc7adf0fd..46100f48e 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -228,6 +228,7 @@ void Schema::CopyFrom(const Schema& other) {
 
   first_is_deleted_virtual_column_idx_ = other.first_is_deleted_virtual_column_idx_;
   has_nullables_ = other.has_nullables_;
+  auto_incrementing_col_idx_ = other.auto_incrementing_col_idx_;
 }
 
 Schema::Schema(Schema&& other) noexcept
@@ -239,7 +240,8 @@ Schema::Schema(Schema&& other) noexcept
       name_to_index_(std::move(other.name_to_index_)),
       id_to_index_(std::move(other.id_to_index_)),
       first_is_deleted_virtual_column_idx_(other.first_is_deleted_virtual_column_idx_),
-      has_nullables_(other.has_nullables_) {
+      has_nullables_(other.has_nullables_),
+      auto_incrementing_col_idx_(other.auto_incrementing_col_idx_) {
 }
 
 Schema& Schema::operator=(Schema&& other) noexcept {
@@ -253,6 +255,7 @@ Schema& Schema::operator=(Schema&& other) noexcept {
     first_is_deleted_virtual_column_idx_ = other.first_is_deleted_virtual_column_idx_;
     has_nullables_ = other.has_nullables_;
     name_to_index_ = std::move(other.name_to_index_);
+    auto_incrementing_col_idx_ = other.auto_incrementing_col_idx_;
   }
   return *this;
 }
@@ -279,14 +282,43 @@ Status Schema::Reset(vector<ColumnSchema> cols,
   }
 
   // Verify that the key columns are not nullable
+  int auto_incrementing_col_idx = kColumnNotFound;
   for (int i = 0; i < key_columns; ++i) {
     if (PREDICT_FALSE(cols_[i].is_nullable())) {
       return Status::InvalidArgument(
         "Bad schema", Substitute("Nullable key columns are not supported: $0",
                                  cols_[i].name()));
     }
+    if (cols_[i].is_auto_incrementing()) {
+      if (auto_incrementing_col_idx != kColumnNotFound) {
+        return Status::InvalidArgument(
+            "Bad schema", "Schemas can have at most one auto-incrementing column");
+      }
+      if (cols_[i].type_info()->type() != INT64) {
+        return Status::InvalidArgument(
+            "Bad schema", "auto-incrementing column should be of type int64");
+      }
+      if (cols_[i].is_immutable()) {
+        return Status::InvalidArgument(
+            "Bad schema", "auto-incrementing column should be immutable");
+      }
+      auto_incrementing_col_idx = i;
+    }
   }
 
+  // TODO:(achennaka) Remove the below section once auto_increment cols are key cols
+  for (int i = 0; i < cols_.size(); i++) {
+    if (cols_[i].is_auto_incrementing()) {
+      auto_incrementing_col_idx = i;
+      if (cols_[i].type_info()->type() != INT64) {
+        return Status::InvalidArgument(
+            "Bad schema", "auto-incrementing column should be of type int64");
+      }
+      break;
+    }
+  }
+
+  auto_incrementing_col_idx_ = auto_incrementing_col_idx;
   // Calculate the offset of each column in the row format.
   col_offsets_.clear();
   col_offsets_.reserve(cols_.size() + 1);  // Include space for total byte size at the end.
@@ -612,12 +644,34 @@ Status SchemaBuilder::AddColumn(const std::string& name,
                                 bool is_immutable,
                                 const void* read_default,
                                 const void* write_default) {
+  return AddColumn(name, type, is_nullable, is_immutable, false, read_default, write_default);
+}
+
+Status SchemaBuilder::AddColumn(const std::string& name,
+                                DataType type,
+                                bool is_nullable,
+                                bool is_immutable,
+                                bool is_auto_incrementing,
+                                const void* read_default,
+                                const void* write_default) {
   if (name.empty()) {
     return Status::InvalidArgument("column name must be non-empty");
   }
 
+  if (is_auto_incrementing) {
+    if (is_nullable || is_immutable) {
+      return Status::InvalidArgument("auto-incrementing column cannot be nullable or immutable");
+    }
+    if (read_default != nullptr || write_default != nullptr) {
+      return Status::InvalidArgument("auto-incrementing column cannot have read/write "
+                                     "defaults set");
+    }
+    if (type != kudu::INT64) {
+      return Status::InvalidArgument("auto-incrementing column should be of type INT64");
+    }
+  }
   return AddColumn(ColumnSchema(name, type, is_nullable, is_immutable,
-                                read_default, write_default), false);
+                                is_auto_incrementing, read_default, write_default), false);
 }
 
 Status SchemaBuilder::RemoveColumn(const string& name) {
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 21ea34506..c7154d0fa 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -212,6 +212,10 @@ class ColumnSchema {
   // is_nullable: true if a row value can be null
   // is_immutable: true if the column is immutable.
   //    Immutable column means the cell value can not be updated after the first insert.
+  // is_auto_incrementing: true if the column is auto-incrementing column.
+  //    Auto-incrementing column cannot be updated but written to by a client and is auto
+  //    filled in by Kudu by incrementing the previous highest written value to the tablet.
+  //    There can only be a single auto-incrementing column per table.
   // read_default: default value used on read if the column was not present before alter.
   //    The value will be copied and released on ColumnSchema destruction.
   // write_default: default value added to the row if the column value was
@@ -224,13 +228,15 @@ class ColumnSchema {
   //   ColumnSchema col_b("b", STRING, true);
   //   ColumnSchema col_b("b", STRING, false, true);
   //   uint32_t default_i32 = -15;
-  //   ColumnSchema col_c("c", INT32, false, false, &default_i32);
+  //   ColumnSchema col_c("c", INT32, false, false, false, &default_i32);
   //   Slice default_str("Hello");
-  //   ColumnSchema col_d("d", STRING, false, false, &default_str);
+  //   ColumnSchema col_d("d", STRING, false, false, false, &default_str);
+  //   ColumnSchema col_e("e", STRING, false, false, true, &default_str);
   ColumnSchema(std::string name,
                DataType type,
                bool is_nullable = false,
                bool is_immutable = false,
+               bool is_auto_incrementing = false,
                const void* read_default = nullptr,
                const void* write_default = nullptr,
                ColumnStorageAttributes attributes = ColumnStorageAttributes(),
@@ -240,6 +246,7 @@ class ColumnSchema {
         type_info_(GetTypeInfo(type)),
         is_nullable_(is_nullable),
         is_immutable_(is_immutable),
+        is_auto_incrementing_(is_auto_incrementing),
         read_default_(read_default ? std::make_shared<Variant>(type, read_default) : nullptr),
         attributes_(attributes),
         type_attributes_(type_attributes),
@@ -263,6 +270,10 @@ class ColumnSchema {
     return is_immutable_;
   }
 
+  bool is_auto_incrementing() const {
+    return is_auto_incrementing_;
+  }
+
   const std::string& name() const {
     return name_;
   }
@@ -387,6 +398,10 @@ class ColumnSchema {
       if (is_immutable_ != other.is_immutable_) {
         return false;
       }
+
+      if (is_auto_incrementing_ != other.is_auto_incrementing_) {
+        return false;
+      }
     }
     return true;
   }
@@ -454,6 +469,7 @@ class ColumnSchema {
   const TypeInfo* type_info_;
   bool is_nullable_;
   bool is_immutable_;
+  bool is_auto_incrementing_;
   // use shared_ptr since the ColumnSchema is always copied around.
   std::shared_ptr<Variant> read_default_;
   std::shared_ptr<Variant> write_default_;
@@ -482,7 +498,8 @@ class Schema {
       // the default (32).
       name_to_index_(1),
       first_is_deleted_virtual_column_idx_(kColumnNotFound),
-      has_nullables_(false) {
+      has_nullables_(false),
+      auto_incrementing_col_idx_(kColumnNotFound) {
     name_to_index_.set_empty_key(StringPiece());
   }
 
@@ -624,6 +641,14 @@ class Schema {
     return iter->second;
   }
 
+  int auto_incrementing_col_idx() const {
+    return auto_incrementing_col_idx_;
+  }
+
+  bool has_auto_incrementing() const {
+    return auto_incrementing_col_idx_ != kColumnNotFound;
+  }
+
   // Returns true if the schema contains nullable columns
   bool has_nullables() const {
     return has_nullables_;
@@ -1011,6 +1036,10 @@ class Schema {
   // Cached indicator whether any columns are nullable.
   bool has_nullables_;
 
+  // Cached index of the auto-incrementing column, or kColumnNotFound if no
+  // such column exists in the schema.
+  int auto_incrementing_col_idx_;
+
   // NOTE: if you add more members, make sure to add the appropriate code to
   // CopyFrom() and the move constructor and assignment operator as well, to
   // prevent subtle bugs.
@@ -1085,6 +1114,14 @@ class SchemaBuilder {
                    const void* read_default,
                    const void* write_default);
 
+  Status AddColumn(const std::string& name,
+                   DataType type,
+                   bool is_nullable,
+                   bool is_immutable,
+                   bool is_auto_incrementing,
+                   const void* read_default,
+                   const void* write_default);
+
   Status RemoveColumn(const std::string& name);
 
   Status RenameColumn(const std::string& old_name, const std::string& new_name);
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 175866c7b..48e4539dd 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -722,7 +722,7 @@ TEST_F(WireProtocolTest, TestColumnDefaultValue) {
   ASSERT_FALSE(col1fpb->has_write_default());
   ASSERT_TRUE(col1fpb->read_default_value() == nullptr);
 
-  ColumnSchema col2("col2", STRING, false, false, &read_default_str);
+  ColumnSchema col2("col2", STRING, false, false, false, &read_default_str);
   ColumnSchemaToPB(col2, &pb);
   optional<ColumnSchema> col2fpb;
   ASSERT_OK(ColumnSchemaFromPB(pb, &col2fpb));
@@ -731,7 +731,7 @@ TEST_F(WireProtocolTest, TestColumnDefaultValue) {
   ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col2fpb->read_default_value()));
   ASSERT_EQ(nullptr, static_cast<const Slice *>(col2fpb->write_default_value()));
 
-  ColumnSchema col3("col3", STRING, false, false, &read_default_str, &write_default_str);
+  ColumnSchema col3("col3", STRING, false, false, false, &read_default_str, &write_default_str);
   ColumnSchemaToPB(col3, &pb);
   optional<ColumnSchema> col3fpb;
   ASSERT_OK(ColumnSchemaFromPB(pb, &col3fpb));
@@ -740,7 +740,7 @@ TEST_F(WireProtocolTest, TestColumnDefaultValue) {
   ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col3fpb->read_default_value()));
   ASSERT_EQ(write_default_str, *static_cast<const Slice *>(col3fpb->write_default_value()));
 
-  ColumnSchema col4("col4", UINT32, false, false, &read_default_u32);
+  ColumnSchema col4("col4", UINT32, false, false, false, &read_default_u32);
   ColumnSchemaToPB(col4, &pb);
   optional<ColumnSchema> col4fpb;
   ASSERT_OK(ColumnSchemaFromPB(pb, &col4fpb));
@@ -749,7 +749,7 @@ TEST_F(WireProtocolTest, TestColumnDefaultValue) {
   ASSERT_EQ(read_default_u32, *static_cast<const uint32_t *>(col4fpb->read_default_value()));
   ASSERT_EQ(nullptr, static_cast<const uint32_t *>(col4fpb->write_default_value()));
 
-  ColumnSchema col5("col5", UINT32, false, false, &read_default_u32, &write_default_u32);
+  ColumnSchema col5("col5", UINT32, false, false, false, &read_default_u32, &write_default_u32);
   ColumnSchemaToPB(col5, &pb);
   optional<ColumnSchema> col5fpb;
   ASSERT_OK(ColumnSchemaFromPB(pb, &col5fpb));
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 1c7625ce5..f16f7f27b 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -270,6 +270,9 @@ void ColumnSchemaToPB(const ColumnSchema& col_schema, ColumnSchemaPB *pb, int fl
   if (!col_schema.comment().empty() && !(flags & SCHEMA_PB_WITHOUT_COMMENT)) {
     pb->set_comment(col_schema.comment());
   }
+  if (col_schema.is_auto_incrementing()) {
+    pb->set_is_auto_incrementing(true);
+  }
 }
 
 Status ColumnSchemaFromPB(const ColumnSchemaPB& pb, optional<ColumnSchema>* col_schema) {
@@ -335,8 +338,9 @@ Status ColumnSchemaFromPB(const ColumnSchemaPB& pb, optional<ColumnSchema>* col_
   // regardless of whether has_comment() is true or false.
   // https://developers.google.com/protocol-buffers/docs/proto#optional
   bool immutable = pb.has_immutable() ? pb.immutable() : false;
+  bool auto_incrementing = pb.has_is_auto_incrementing() ? pb.is_auto_incrementing() : false;
   *col_schema = ColumnSchema(pb.name(), pb.type(), pb.is_nullable(),
-                             immutable,
+                             immutable, auto_incrementing,
                              read_default_ptr, write_default_ptr,
                              attributes, type_attributes, pb.comment());
   return Status::OK();
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 39b76ac41..0e7ca24b5 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -63,6 +63,7 @@ ADD_KUDU_TEST(alter_table-randomized-test NUM_SHARDS 3 PROCESSORS 4)
 ADD_KUDU_TEST(alter_table-test PROCESSORS 3)
 ADD_KUDU_TEST(auth_token_expire-itest)
 ADD_KUDU_TEST(authz_token-itest PROCESSORS 2)
+ADD_KUDU_TEST(auto_incrementing-itest)
 ADD_KUDU_TEST(catalog_manager_tsk-itest PROCESSORS 2)
 ADD_KUDU_TEST(varchar-itest)
 ADD_KUDU_TEST(client_failover-itest)
diff --git a/src/kudu/integration-tests/auto_incrementing-itest.cc b/src/kudu/integration-tests/auto_incrementing-itest.cc
new file mode 100644
index 000000000..7086c731a
--- /dev/null
+++ b/src/kudu/integration-tests/auto_incrementing-itest.cc
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Integration test for flexible partitioning (eg buckets, range partitioning
+// of PK subsets, etc).
+
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::rpc::RpcController;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::NewScanRequestPB;
+using kudu::tserver::ScanResponsePB;
+using kudu::tserver::ScanRequestPB;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace itest {
+
+static const char* const kTableName = "test-table";
+static const int kNumTabletServers = 3;
+static const int kNumRows = 200;
+
+class AutoIncrementingItest : public tserver::TabletServerTestBase {
+ public:
+
+  void SetUp() override {
+    TabletServerTestBase::SetUp();
+    cluster::InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = kNumTabletServers;
+    cluster_.reset(new cluster::InternalMiniCluster(env_, opts));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  }
+
+  Status CreateTableWithData() {
+    KuduSchemaBuilder b;
+    b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+    b.AddColumn("c1")->Type(KuduColumnSchema::INT64)->NotNull()->AutoIncrementing();
+    RETURN_NOT_OK(b.Build(&kudu_schema_));
+
+    // Create a table with a range partition
+    int lower_bound = 0;
+    int upper_bound = 200;
+    unique_ptr<KuduPartialRow> lower(kudu_schema_.NewRow());
+    unique_ptr<KuduPartialRow> upper(kudu_schema_.NewRow());
+
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    RETURN_NOT_OK(lower->SetInt32("c0", lower_bound));
+    RETURN_NOT_OK(upper->SetInt32("c0", upper_bound));
+
+    return(table_creator->table_name(kTableName)
+           .schema(&kudu_schema_)
+           .set_range_partition_columns({"c0"})
+           .add_range_partition(lower.release(), upper.release())
+           .num_replicas(3)
+           .Create());
+  }
+
+  Status InsertData() {
+    RETURN_NOT_OK(client_->OpenTable(kTableName, &table_));
+    shared_ptr<KuduSession> session(client_->NewSession());
+    for (int i = 0; i < kNumRows; i++) {
+      unique_ptr<KuduInsert> insert(table_->NewInsert());
+      KuduPartialRow* row = insert->mutable_row();
+      RETURN_NOT_OK(row->SetInt32("c0", i));
+      RETURN_NOT_OK(session->Apply(insert.release()));
+    }
+    return Status::OK();
+  }
+
+  // Returns a scan response from the tablet on the given tablet server.
+  Status ScanTablet(int ts, const string& tablet_id, vector<string>* results) {
+    ScanResponsePB resp;
+    RpcController rpc;
+    ScanRequestPB req;
+
+    NewScanRequestPB* scan = req.mutable_new_scan_request();
+    scan->set_tablet_id(tablet_id);
+    scan->set_read_mode(READ_LATEST);
+
+    Schema schema = Schema({ ColumnSchema("c0", INT32),
+                             ColumnSchema("c1",INT64, false,false, true),
+                           },2);
+    RETURN_NOT_OK(SchemaToColumnPBs(schema, scan->mutable_projected_columns()));
+    RETURN_NOT_OK(cluster_->tserver_proxy(ts)->Scan(req, &resp, &rpc));
+    StringifyRowsFromResponse(schema, rpc, &resp, results);
+    return Status::OK();
+  }
+
+  protected:
+  unique_ptr<cluster::InternalMiniCluster> cluster_;
+  KuduSchema kudu_schema_;
+  shared_ptr<KuduClient> client_;
+  shared_ptr<KuduTable> table_;
+};
+
+TEST_F(AutoIncrementingItest, TestAutoIncrementingItest) {
+  // Create a table and insert data
+  ASSERT_OK(CreateTableWithData());
+  ASSERT_OK(InsertData());
+  // Scan all the tablet replicas
+  for (int j = 0; j < kNumTabletServers; j++) {
+    vector<scoped_refptr<TabletReplica>> replicas;
+    auto* server = cluster_->mini_tablet_server(j);
+    server->server()->tablet_manager()->GetTabletReplicas(&replicas);
+    DCHECK_EQ(1, replicas.size());
+    vector<string> results;
+    ASSERT_OK(ScanTablet(j, replicas[0]->tablet_id(), &results));
+    for (int i = 0; i < kNumRows; i++) {
+      ASSERT_EQ(Substitute("(int32 c0=$0, int64 c1=$1)", i, i + 1), results[i]);
+    }
+  }
+}
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index a9a5867a6..8ae0dbf1b 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -120,6 +120,7 @@ ADD_KUDU_TEST(tablet-decoder-eval-test)
 ADD_KUDU_TEST(tablet-pushdown-test)
 ADD_KUDU_TEST(tablet-schema-test)
 ADD_KUDU_TEST(tablet-test)
+ADD_KUDU_TEST(tablet_auto_incrementing-test)
 ADD_KUDU_TEST(tablet_bootstrap-test)
 ADD_KUDU_TEST(tablet_history_gc-test)
 ADD_KUDU_TEST(tablet_metadata-test)
diff --git a/src/kudu/tablet/all_types-scan-correctness-test.cc b/src/kudu/tablet/all_types-scan-correctness-test.cc
index 859a3194c..de3b57c3e 100644
--- a/src/kudu/tablet/all_types-scan-correctness-test.cc
+++ b/src/kudu/tablet/all_types-scan-correctness-test.cc
@@ -79,9 +79,9 @@ static const int kStrlen = 10;
 struct RowOpsBase {
   RowOpsBase(DataType type, EncodingType encoding) : type_(type), encoding_(encoding) {
     schema_ = Schema({ColumnSchema("key", INT32),
-                     ColumnSchema("val_a", type, true, false, nullptr, nullptr,
+                     ColumnSchema("val_a", type, true, false, false, nullptr, nullptr,
                          ColumnStorageAttributes(encoding, DEFAULT_COMPRESSION)),
-                     ColumnSchema("val_b", type, true, false, nullptr, nullptr,
+                     ColumnSchema("val_b", type, true, false, false, nullptr, nullptr,
                          ColumnStorageAttributes(encoding, DEFAULT_COMPRESSION))}, 1);
 
   }
@@ -368,11 +368,11 @@ public:
     ASSERT_OK(builder.AddColumn("val_c", rowops_.type_, true, default_ptr, nullptr));
     AlterSchema(builder.Build());
     altered_schema_ = Schema({ColumnSchema("key", INT32),
-                     ColumnSchema("val_a", rowops_.type_, true, false, nullptr, nullptr,
+                     ColumnSchema("val_a", rowops_.type_, true, false, false, nullptr, nullptr,
                          ColumnStorageAttributes(rowops_.encoding_, DEFAULT_COMPRESSION)),
-                     ColumnSchema("val_b", rowops_.type_, true, false, nullptr, nullptr,
+                     ColumnSchema("val_b", rowops_.type_, true, false, false, nullptr, nullptr,
                          ColumnStorageAttributes(rowops_.encoding_, DEFAULT_COMPRESSION)),
-                     ColumnSchema("val_c", rowops_.type_, true, false, default_ptr, nullptr,
+                     ColumnSchema("val_c", rowops_.type_, true, false, false, default_ptr, nullptr,
                          ColumnStorageAttributes(rowops_.encoding_, DEFAULT_COMPRESSION))}, 1);
   }
 
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index d50924488..c63c05cea 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -89,7 +89,7 @@ class TestCFileSet : public KuduRowSetTest {
  public:
   TestCFileSet() :
     KuduRowSetTest(Schema({ ColumnSchema("c0", INT32),
-                            ColumnSchema("c1", INT32, false, false,
+                            ColumnSchema("c1", INT32, false, false, false,
                                          nullptr, nullptr, GetRLEStorage()),
                             ColumnSchema("c2", INT32, true) }, 1))
   {}
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index 575e28b61..d645bdaa3 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -845,7 +845,8 @@ TEST_P(DiffScanRowSetTest, TestFuzz) {
     if (add_vc_is_deleted) {
       bool read_default = false;
       col_schemas.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
-                               /*is_immutable=*/ false, &read_default);
+                               /*is_immutable=*/ false, /*is_auto_incrementing=*/ false,
+                               &read_default);
       col_ids.emplace_back(schema_.max_col_id() + 1);
     }
     Schema projection(col_schemas, col_ids, 1);
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index e9e85e5fc..f2a62d3cd 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -155,7 +155,8 @@ WriteOp::WriteOp(unique_ptr<WriteOpState> state, DriverType type)
 void WriteOp::NewReplicateMsg(unique_ptr<ReplicateMsg>* replicate_msg) {
   replicate_msg->reset(new ReplicateMsg);
   (*replicate_msg)->set_op_type(consensus::OperationType::WRITE_OP);
-  (*replicate_msg)->mutable_write_request()->CopyFrom(*state()->request());
+  auto* write_req = (*replicate_msg)->mutable_write_request();
+  write_req->CopyFrom(*state()->request());
   if (state()->are_results_tracked()) {
     (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
   }
@@ -188,8 +189,22 @@ Status WriteOp::Prepare() {
       return s;
     }
   }
-
-  s = tablet->DecodeWriteOperations(&client_schema, state());
+  // In case of leader replica, set the auto-incrementing column value in the raft consensus
+  // replicate message and in case of follower replica update the auto increment counter to
+  // the value present in the raft consensus replicate message.
+  bool is_leader = type() == consensus::LEADER;
+  if (state_->consensus_round() &&
+      state_->tablet_replica()->tablet()->schema()->has_auto_incrementing()) {
+    if (is_leader) {
+      auto* write_req = state_->consensus_round()->replicate_msg()->mutable_write_request();
+      write_req->mutable_auto_incrementing_column()->set_auto_incrementing_counter(
+          tablet->GetAutoIncrementingCounter());
+    } else {
+      tablet->SetAutoIncrementingCounter(state_->consensus_round()->
+          replicate_msg()->write_request().auto_incrementing_column().auto_incrementing_counter());
+    }
+  }
+  s = tablet->DecodeWriteOperations(&client_schema, state(), is_leader);
   if (!s.ok()) {
     // TODO(unknown): is MISMATCHED_SCHEMA always right here? probably not.
     state()->completion_callback()->set_error(s, TabletServerErrorPB::MISMATCHED_SCHEMA);
diff --git a/src/kudu/tablet/tablet-decoder-eval-test.cc b/src/kudu/tablet/tablet-decoder-eval-test.cc
index b64f76f56..972480a68 100644
--- a/src/kudu/tablet/tablet-decoder-eval-test.cc
+++ b/src/kudu/tablet/tablet-decoder-eval-test.cc
@@ -76,11 +76,11 @@ public:
   TabletDecoderEvalTest()
           : KuduTabletTest(Schema({ColumnSchema("key", INT32),
                                    ColumnSchema("string_val_a", STRING, true, false,
-                                                nullptr, nullptr,
+                                                false, nullptr, nullptr,
                                                 ColumnStorageAttributes(DICT_ENCODING,
                                                                         DEFAULT_COMPRESSION)),
                                    ColumnSchema("string_val_b", STRING, true, false,
-                                                nullptr, nullptr,
+                                                false, nullptr, nullptr,
                                                 ColumnStorageAttributes(DICT_ENCODING,
                                                                         DEFAULT_COMPRESSION))}, 1))
   {}
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 62cd8b5f2..a013301af 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -792,7 +792,8 @@ static inline Schema GetRandomProjection(const Schema& schema,
   if (allow == AllowIsDeleted::YES && prng->Uniform(10) == 0) {
     bool read_default = false;
     projected_cols.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
-                                /*is_immutable=*/ false, &read_default);
+                                /*is_immutable=*/ false, /*is_auto_incrementing*/ false,
+                                &read_default);
     projected_col_ids.emplace_back(schema.max_col_id() + 1);
   }
   return Schema(projected_cols, projected_col_ids, 0);
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 20217318d..6728983de 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -1707,7 +1707,8 @@ TYPED_TEST(TestTablet, TestDiffScanUnobservableOperations) {
     vector<ColumnSchema> col_schemas(this->client_schema().columns());
     bool read_default = false;
     col_schemas.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
-                             /*is_immutable=*/ false, &read_default);
+                             /*is_immutable=*/ false, /*is_auto_incremented=*/ false,
+                             &read_default);
     Schema projection(col_schemas, this->client_schema().num_key_columns());
 
     // Do the diff scan.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index d2afc7a94..d33c653c5 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -333,6 +333,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
       log_anchor_registry_(std::move(log_anchor_registry)),
       mem_trackers_(tablet_id(), std::move(parent_mem_tracker)),
       next_mrs_id_(0),
+      auto_incrementing_counter_(0),
       clock_(clock),
       txn_participant_(metadata_),
       rowsets_flush_sem_(1),
@@ -596,7 +597,8 @@ Status Tablet::NewRowIterator(RowIteratorOptions opts,
 }
 
 Status Tablet::DecodeWriteOperations(const Schema* client_schema,
-                                     WriteOpState* op_state) {
+                                     WriteOpState* op_state,
+                                     bool is_leader) {
   TRACE_EVENT0("tablet", "Tablet::DecodeWriteOperations");
 
   DCHECK(op_state->row_ops().empty());
@@ -616,7 +618,7 @@ Status Tablet::DecodeWriteOperations(const Schema* client_schema,
                              client_schema,
                              schema_ptr.get(),
                              op_state->arena());
-  RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
+  RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops, &auto_incrementing_counter_));
   TRACE_COUNTER_INCREMENT("num_ops", ops.size());
 
   // Important to set the schema before the ops -- we need the
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 7a56e0ce3..06f6a2a17 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -156,7 +156,8 @@ class Tablet {
   // Decode the Write (insert/mutate) operations from within a user's
   // request.
   Status DecodeWriteOperations(const Schema* client_schema,
-                               WriteOpState* op_state);
+                               WriteOpState* op_state,
+                               bool is_leader = false);
 
   // Acquire locks for each of the operations in the given write op.
   // This also sets the row op's RowSetKeyProbe.
@@ -538,6 +539,13 @@ class Tablet {
                                              int64_t* replay_size = nullptr,
                                              MonoTime* earliest_dms_time = nullptr) const;
 
+  int64_t GetAutoIncrementingCounter() const {
+    return auto_incrementing_counter_;
+  }
+
+  void SetAutoIncrementingCounter(int64_t auto_incrementing_counter) {
+    auto_incrementing_counter_ = auto_incrementing_counter;
+  }
  private:
   friend class kudu::AlterTableTest;
   friend class Iterator;
@@ -796,6 +804,10 @@ class Tablet {
 
   int64_t next_mrs_id_;
 
+  // Counter for an auto-incrementing column. It is expected that this is only
+  // updated by the prepare thread.
+  int64_t auto_incrementing_counter_;
+
   // A pointer to the server's clock.
   clock::Clock* clock_;
 
diff --git a/src/kudu/tablet/tablet_auto_incrementing-test.cc b/src/kudu/tablet/tablet_auto_incrementing-test.cc
new file mode 100644
index 000000000..0ea250ed5
--- /dev/null
+++ b/src/kudu/tablet/tablet_auto_incrementing-test.cc
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/iterator.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/local_tablet_writer.h"
+#include "kudu/tablet/tablet-test-util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tablet {
+
+namespace {
+Schema CreateAutoIncrementingTestSchema() {
+  return Schema({ColumnSchema("key", INT64, false, false,
+                              /*is_auto_incrementing*/ true, nullptr, nullptr, {}, {}, ""),
+                 ColumnSchema("val", INT32, true) }, 1);
+}
+} // anonymous namespace
+
+// Creates a table with single tablet with an auto incrementing column
+// and a writer object for the tablet.
+class AutoIncrementingTabletTest : public KuduTabletTest {
+ public:
+  AutoIncrementingTabletTest()
+      : KuduTabletTest(CreateAutoIncrementingTestSchema()) {}
+
+  void SetUp() override {
+    KuduTabletTest::SetUp();
+    writer_.reset(new LocalTabletWriter(tablet().get(), &client_schema_));
+  }
+ protected:
+  unique_ptr<LocalTabletWriter> writer_;
+};
+
+TEST_F(AutoIncrementingTabletTest, TestInsertOp) {
+  // Insert rows into the tablet populating only non auto-incrementing columns.
+  for (int i = 0; i < 10; i++) {
+    unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+    ASSERT_OK(row->SetInt32(1, 1337));
+    ASSERT_OK(writer_->Insert(*row));
+  }
+
+  // Scan the tablet data and verify the auto increment counter is set correctly.
+  unique_ptr<RowwiseIterator> iter;
+  ASSERT_OK(tablet()->NewRowIterator(schema_.CopyWithoutColumnIds(), &iter));
+  ASSERT_OK(iter->Init(nullptr));
+  vector<string> out;
+  IterateToStringList(iter.get(), &out);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_STR_CONTAINS(out[i], Substitute("int64 key=$0", i + 1));
+  }
+}
+
+TEST_F(AutoIncrementingTabletTest, TestInsertOpWithAutoIncrementSet) {
+  // Insert a row into the tablet populating auto-incrementing column.
+  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+  ASSERT_OK(row->SetInt64(0, 10));
+  ASSERT_OK(row->SetInt32(1, 1337));
+  Status s = writer_->Insert(*row);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_EQ("Invalid argument: auto-incrementing column is incorrectly set", s.ToString());
+}
+
+TEST_F(AutoIncrementingTabletTest, TestUpsertOp) {
+  // Insert a row into the tablet populating only non auto-incrementing columns
+  // using UPSERT.
+  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+  ASSERT_OK(row->SetInt32(1, 1337));
+  Status s = writer_->Upsert(*row);
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+            "column do not support UPSERT operations", s.ToString());
+}
+
+TEST_F(AutoIncrementingTabletTest, TestUpsertIgnoreOp) {
+  // Insert a row into the tablet populating only non auto-incrementing columns
+  // using UPSERT_IGNORE.
+  unique_ptr<KuduPartialRow> row(new KuduPartialRow(&client_schema_));
+  ASSERT_OK(row->SetInt32(1, 1337));
+  Status s = writer_->UpsertIgnore(*row);
+  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+  ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+            "column do not support UPSERT_IGNORE operations", s.ToString());
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index bd19626ce..dce3f7118 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3178,13 +3178,13 @@ void ToolTest::RunLoadgen(int num_tservers,
         ColumnSchema("int64_val", INT64),
         ColumnSchema("float_val", FLOAT),
         ColumnSchema("double_val", DOUBLE),
-        ColumnSchema("decimal32_val", DECIMAL32, false, false,
+        ColumnSchema("decimal32_val", DECIMAL32, false, false, false,
                      nullptr, nullptr, ColumnStorageAttributes(),
                      ColumnTypeAttributes(9, 9)),
-        ColumnSchema("decimal64_val", DECIMAL64, false, false,
+        ColumnSchema("decimal64_val", DECIMAL64, false, false, false,
                      nullptr, nullptr, ColumnStorageAttributes(),
                      ColumnTypeAttributes(18, 2)),
-        ColumnSchema("decimal128_val", DECIMAL128, false, false,
+        ColumnSchema("decimal128_val", DECIMAL128, false, false, false,
                      nullptr, nullptr, ColumnStorageAttributes(),
                      ColumnTypeAttributes(38, 0)),
         ColumnSchema("unixtime_micros_val", UNIXTIME_MICROS),
diff --git a/src/kudu/tserver/tablet_server_authorization-test.cc b/src/kudu/tserver/tablet_server_authorization-test.cc
index 69ec62b1a..6ec8c43bd 100644
--- a/src/kudu/tserver/tablet_server_authorization-test.cc
+++ b/src/kudu/tserver/tablet_server_authorization-test.cc
@@ -475,9 +475,9 @@ string GenerateEncodedKey(int32_t val, const Schema& schema) {
 // Returns a column schema PB that matches 'col', but has a different name.
 void MisnamedColumnSchemaToPB(const ColumnSchema& col, ColumnSchemaPB* pb) {
   ColumnSchemaToPB(ColumnSchema(kDummyColumn, col.type_info()->physical_type(), col.is_nullable(),
-                                col.is_immutable(), col.read_default_value(),
-                                col.write_default_value(), col.attributes(),
-                                col.type_attributes()), pb);
+                                col.is_immutable(), col.is_auto_incrementing(),
+                                col.read_default_value(), col.write_default_value(),
+                                col.attributes(), col.type_attributes()), pb);
 }
 
 } // anonymous namespace
@@ -596,6 +596,7 @@ class ScanPrivilegeAuthzTest : public AuthzTabletServerTestBase,
       bool default_bool = false;
       ColumnSchemaToPB(ColumnSchema("is_deleted", DataType::IS_DELETED, /*is_nullable=*/false,
                                     /*is_immutable=*/false,
+                                    /*is_auto_incrementing=*/false,
                                     /*read_default=*/&default_bool, nullptr), projected_column);
     }
     CHECK_OK(GenerateScanAuthzToken(privilege, pb.mutable_authz_token()));
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 8f6b7f0e6..dfceae8cb 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -136,6 +136,11 @@ message PingRequestPB {
 message PingResponsePB {
 }
 
+message AutoIncrementingColumnPB {
+  // Value of the auto-incrementing counter
+  optional int64 auto_incrementing_counter = 1;
+}
+
 // A batched set of insert/mutate requests.
 message WriteRequestPB {
   required bytes tablet_id = 1;
@@ -161,6 +166,9 @@ message WriteRequestPB {
 
   // The transaction ID associated with this write request, if any.
   optional int64 txn_id = 7;
+
+  // The auto-incrementing column information
+  optional AutoIncrementingColumnPB auto_incrementing_column = 8;
 }
 
 message WriteResponsePB {