You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/03 22:59:31 UTC

[1/2] kudu git commit: c++ client: remove unnecessary code

Repository: kudu
Updated Branches:
  refs/heads/master ec39756ef -> 7ca727396


c++ client: remove unnecessary code

1. GetTableSchema() was implemented using its own RPC instead of a much
   simpler call to SyncLeaderMasterRpc(). An RPC is attractive for
   asynchronous use, but since it's never used that way, let's just switch
   over to SyncLeaderMasterRpc().
2. KuduTable::Open() was issuing both GetTableSchema() and
   GetTableLocations() RPCs. It's not clear why we're doing the latter; the
   response is completely ignored. So let's stop doing that.

My main motivation is to remove fragile error-handling and retry logic which
has been duplicated all over the client.

Change-Id: Idda2cc15bc6224df992bfe2eec287c0222adced0
Reviewed-on: http://gerrit.cloudera.org:8080/3809
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7766e927
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7766e927
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7766e927

Branch: refs/heads/master
Commit: 7766e9270bb8351153656c1d70d2674434d420ab
Parents: ec39756
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jul 27 23:03:01 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 3 22:46:21 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc | 220 +++++---------------------------
 src/kudu/client/client.cc          |  11 +-
 src/kudu/client/table-internal.cc  | 105 ---------------
 src/kudu/client/table-internal.h   |   2 -
 4 files changed, 34 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7766e927/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 69d643d..aa2f98e 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <limits>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <vector>
@@ -46,6 +47,7 @@
 using std::set;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -453,11 +455,6 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
           &MasterServiceProxy::AlterTable,
           std::move(required_feature_flags));
   RETURN_NOT_OK(s);
-  // TODO: Consider the situation where the request is sent to the
-  // server, gets executed on the server and written to the server,
-  // but is seen as failed by the client, and is then retried (in which
-  // case the retry will fail due to original table being removed, a
-  // column being already added, etc...)
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -544,197 +541,40 @@ bool KuduClient::Data::IsTabletServerLocal(const RemoteTabletServer& rts) const
   return false;
 }
 
-namespace internal {
-
-// Gets a table's schema from the leader master. If the leader master
-// is down, waits for a new master to become the leader, and then gets
-// the table schema from the new leader master.
-//
-// TODO: When we implement the next fault tolerant client-master RPC
-// call (e.g., CreateTable/AlterTable), we should generalize this
-// method as to enable code sharing.
-class GetTableSchemaRpc : public Rpc {
- public:
-  GetTableSchemaRpc(KuduClient* client,
-                    StatusCallback user_cb,
-                    string table_name,
-                    KuduSchema* out_schema,
-                    PartitionSchema* out_partition_schema,
-                    string* out_id,
-                    const MonoTime& deadline,
-                    const shared_ptr<rpc::Messenger>& messenger);
-
-  virtual void SendRpc() OVERRIDE;
-
-  virtual string ToString() const OVERRIDE;
-
-  virtual ~GetTableSchemaRpc();
-
- private:
-  virtual void SendRpcCb(const Status& status) OVERRIDE;
-
-  void ResetLeaderMasterAndRetry();
-
-  void NewLeaderMasterDeterminedCb(const Status& status);
-
-  KuduClient* client_;
-  StatusCallback user_cb_;
-  const string table_name_;
-  KuduSchema* out_schema_;
-  PartitionSchema* out_partition_schema_;
-  string* out_id_;
-  GetTableSchemaResponsePB resp_;
-};
-
-GetTableSchemaRpc::GetTableSchemaRpc(KuduClient* client,
-                                     StatusCallback user_cb,
-                                     string table_name,
-                                     KuduSchema* out_schema,
-                                     PartitionSchema* out_partition_schema,
-                                     string* out_id,
-                                     const MonoTime& deadline,
-                                     const shared_ptr<rpc::Messenger>& messenger)
-    : Rpc(deadline, messenger),
-      client_(DCHECK_NOTNULL(client)),
-      user_cb_(std::move(user_cb)),
-      table_name_(std::move(table_name)),
-      out_schema_(DCHECK_NOTNULL(out_schema)),
-      out_partition_schema_(DCHECK_NOTNULL(out_partition_schema)),
-      out_id_(DCHECK_NOTNULL(out_id)) {
-}
-
-GetTableSchemaRpc::~GetTableSchemaRpc() {
-}
-
-void GetTableSchemaRpc::SendRpc() {
-  MonoTime now = MonoTime::Now(MonoTime::FINE);
-  if (retrier().deadline().ComesBefore(now)) {
-    SendRpcCb(Status::TimedOut("GetTableSchema timed out after deadline expired"));
-    return;
-  }
-
-  // See KuduClient::Data::SyncLeaderMasterRpc().
-  MonoTime rpc_deadline = now;
-  rpc_deadline.AddDelta(client_->default_rpc_timeout());
-  mutable_retrier()->mutable_controller()->set_deadline(
-      MonoTime::Earliest(rpc_deadline, retrier().deadline()));
-
-  GetTableSchemaRequestPB req;
-  req.mutable_table()->set_table_name(table_name_);
-  client_->data_->master_proxy()->GetTableSchemaAsync(
-      req, &resp_,
-      mutable_retrier()->mutable_controller(),
-      boost::bind(&GetTableSchemaRpc::SendRpcCb, this, Status::OK()));
-}
-
-string GetTableSchemaRpc::ToString() const {
-  return Substitute("GetTableSchemaRpc(table_name: $0, num_attempts: $1)",
-                    table_name_, num_attempts());
-}
-
-void GetTableSchemaRpc::ResetLeaderMasterAndRetry() {
-  client_->data_->SetMasterServerProxyAsync(
-      client_,
-      retrier().deadline(),
-      Bind(&GetTableSchemaRpc::NewLeaderMasterDeterminedCb,
-           Unretained(this)));
-}
-
-void GetTableSchemaRpc::NewLeaderMasterDeterminedCb(const Status& status) {
-  if (status.ok()) {
-    mutable_retrier()->mutable_controller()->Reset();
-    SendRpc();
-  } else {
-    LOG(WARNING) << "Failed to determine new Master: " << status.ToString();
-    mutable_retrier()->DelayedRetry(this, status);
-  }
-}
-
-void GetTableSchemaRpc::SendRpcCb(const Status& status) {
-  Status new_status = status;
-  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
-    return;
-  }
-
-  if (new_status.ok() && resp_.has_error()) {
-    if (resp_.error().code() == MasterErrorPB::NOT_THE_LEADER ||
-        resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
-      if (client_->IsMultiMaster()) {
-        LOG(WARNING) << "Leader Master has changed ("
-                     << client_->data_->leader_master_hostport().ToString()
-                     << " is no longer the leader), re-trying...";
-        ResetLeaderMasterAndRetry();
-        return;
-      }
-    }
-    new_status = StatusFromPB(resp_.error().status());
-  }
-
-  if (new_status.IsTimedOut()) {
-    if (MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
-      if (client_->IsMultiMaster()) {
-        LOG(WARNING) << "Leader Master ("
-            << client_->data_->leader_master_hostport().ToString()
-            << ") timed out, re-trying...";
-        ResetLeaderMasterAndRetry();
-        return;
-      }
-    } else {
-      // Operation deadline expired during this latest RPC.
-      new_status = new_status.CloneAndPrepend(
-          "GetTableSchema timed out after deadline expired");
-    }
-  }
-
-  if (new_status.IsNetworkError()) {
-    if (client_->IsMultiMaster()) {
-      LOG(WARNING) << "Encountered a network error from the Master("
-                   << client_->data_->leader_master_hostport().ToString() << "): "
-                   << new_status.ToString() << ", retrying...";
-      ResetLeaderMasterAndRetry();
-      return;
-    }
-  }
-
-  if (new_status.ok()) {
-    gscoped_ptr<Schema> schema(new Schema());
-    new_status = SchemaFromPB(resp_.schema(), schema.get());
-    if (new_status.ok()) {
-      delete out_schema_->schema_;
-      out_schema_->schema_ = schema.release();
-      new_status = PartitionSchema::FromPB(resp_.partition_schema(),
-                                           *out_schema_->schema_,
-                                           out_partition_schema_);
-
-      *out_id_ = resp_.table_id();
-      CHECK_GT(out_id_->size(), 0) << "Running against a too-old master";
-    }
-  }
-  if (!new_status.ok()) {
-    LOG(WARNING) << ToString() << " failed: " << new_status.ToString();
-  }
-  user_cb_.Run(new_status);
-}
-
-} // namespace internal
-
 Status KuduClient::Data::GetTableSchema(KuduClient* client,
                                         const string& table_name,
                                         const MonoTime& deadline,
                                         KuduSchema* schema,
                                         PartitionSchema* partition_schema,
                                         string* table_id) {
-  Synchronizer sync;
-  GetTableSchemaRpc rpc(client,
-                        sync.AsStatusCallback(),
-                        table_name,
-                        schema,
-                        partition_schema,
-                        table_id,
-                        deadline,
-                        messenger_);
-  rpc.SendRpc();
-  return sync.Wait();
+  GetTableSchemaRequestPB req;
+  GetTableSchemaResponsePB resp;
+
+  req.mutable_table()->set_table_name(table_name);
+  Status s = SyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB>(
+      deadline, client, req, &resp,
+      "GetTableSchema", &MasterServiceProxy::GetTableSchema, {});
+  RETURN_NOT_OK(s);
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  // Parse the server schema out of the response.
+  unique_ptr<Schema> new_schema(new Schema());
+  RETURN_NOT_OK(SchemaFromPB(resp.schema(), new_schema.get()));
+
+  // Parse the server partition schema out of the response.
+  PartitionSchema new_partition_schema;
+  RETURN_NOT_OK(PartitionSchema::FromPB(resp.partition_schema(),
+                                        *new_schema.get(),
+                                        &new_partition_schema));
+
+  // Parsing was successful; release the schemas to the user.
+  delete schema->schema_;
+  schema->schema_ = new_schema.release();
+  *partition_schema = std::move(new_partition_schema);
+  *table_id = resp.table_id();
+  return Status::OK();
 }
 
 void KuduClient::Data::LeaderMasterDetermined(const Status& status,

http://git-wip-us.apache.org/repos/asf/kudu/blob/7766e927/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 6999a01..f0fba94 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -384,13 +384,10 @@ Status KuduClient::OpenTable(const string& table_name,
                                       &partition_schema,
                                       &table_id));
 
-  // In the future, probably will look up the table in some map to reuse KuduTable
-  // instances.
-  shared_ptr<KuduTable> ret(new KuduTable(shared_from_this(), table_name, table_id,
-                                          schema, partition_schema));
-  RETURN_NOT_OK(ret->data_->Open());
-  table->swap(ret);
-
+  // TODO: in the future, probably will look up the table in some map to reuse
+  // KuduTable instances.
+  table->reset(new KuduTable(shared_from_this(), table_name, table_id,
+                             schema, partition_schema));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/7766e927/src/kudu/client/table-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/table-internal.cc b/src/kudu/client/table-internal.cc
index c9aa5dd..ec0df68 100644
--- a/src/kudu/client/table-internal.cc
+++ b/src/kudu/client/table-internal.cc
@@ -19,22 +19,7 @@
 
 #include <string>
 
-#include "kudu/client/client-internal.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/util/monotime.h"
-
 namespace kudu {
-
-using master::GetTableLocationsRequestPB;
-using master::GetTableLocationsResponsePB;
-using rpc::RpcController;
-using std::string;
-
 namespace client {
 
 using sp::shared_ptr;
@@ -54,95 +39,5 @@ KuduTable::Data::Data(shared_ptr<KuduClient> client,
 KuduTable::Data::~Data() {
 }
 
-Status KuduTable::Data::Open() {
-  // TODO: fetch the schema from the master here once catalog is available.
-  GetTableLocationsRequestPB req;
-  GetTableLocationsResponsePB resp;
-
-  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(client_->default_admin_operation_timeout());
-
-  req.mutable_table()->set_table_id(id_);
-  Status s;
-  // TODO: replace this with Async RPC-retrier based RPC in the next revision,
-  // adding exponential backoff and allowing this to be used safely in a
-  // a reactor thread.
-  while (true) {
-    RpcController rpc;
-
-    // Have we already exceeded our deadline?
-    MonoTime now = MonoTime::Now(MonoTime::FINE);
-    if (deadline.ComesBefore(now)) {
-      const char* msg = "OpenTable timed out after deadline expired";
-      LOG(ERROR) << msg;
-      return Status::TimedOut(msg);
-    }
-
-    // See KuduClient::Data::SyncLeaderMasterRpc().
-    MonoTime rpc_deadline = now;
-    rpc_deadline.AddDelta(client_->default_rpc_timeout());
-    rpc.set_deadline(MonoTime::Earliest(rpc_deadline, deadline));
-
-    s = client_->data_->master_proxy()->GetTableLocations(req, &resp, &rpc);
-    if (!s.ok()) {
-      // Various conditions cause us to look for the leader master again.
-      // It's ok if that eventually fails; we'll retry over and over until
-      // the deadline is reached.
-
-      if (s.IsNetworkError()) {
-        LOG(WARNING) << "Network error talking to the leader master ("
-                     << client_->data_->leader_master_hostport().ToString() << "): "
-                     << s.ToString();
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-
-      if (s.IsTimedOut()
-          && MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
-        // If the RPC timed out and the operation deadline expired, we'll loop
-        // again and time out for good above.
-        LOG(WARNING) << "Timed out talking to the leader master ("
-                     << client_->data_->leader_master_hostport().ToString() << "): "
-                     << s.ToString();
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-    }
-    if (s.ok() && resp.has_error()) {
-      if (resp.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
-          resp.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
-        LOG(WARNING) << "Master " << client_->data_->leader_master_hostport().ToString()
-                     << " is no longer the leader master.";
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-      s = StatusFromPB(resp.error().status());
-    }
-    if (s.ok()) {
-      break;
-    } else {
-      LOG(WARNING) << "Error getting table locations: " << s.ToString() << ", retrying.";
-    }
-
-    /* TODO: Use exponential backoff instead */
-    base::SleepForMilliseconds(100);
-  }
-
-  VLOG(1) << "Open Table " << name_ << ", found " << resp.tablet_locations_size() << " tablets";
-  return Status::OK();
-}
-
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7766e927/src/kudu/client/table-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/table-internal.h b/src/kudu/client/table-internal.h
index 0a56f0d..17c90f2 100644
--- a/src/kudu/client/table-internal.h
+++ b/src/kudu/client/table-internal.h
@@ -35,8 +35,6 @@ class KuduTable::Data {
        PartitionSchema partition_schema);
   ~Data();
 
-  Status Open();
-
   sp::shared_ptr<KuduClient> client_;
 
   std::string name_;


[2/2] kudu git commit: [KuduScanBatch::const_iterator] a minor clean-up

Posted by to...@apache.org.
[KuduScanBatch::const_iterator] a minor clean-up

More 'standardized' signature for the prefix increment operator.
Added postfix increment operator.  Changed the behavior of the
equality/non-equality operators: along with current row index,
also check whether iterators are built upon the same batch.
Added units tests to cover the modified code.

Change-Id: I3eeba46e5bc45c9dd7b96abad60173381d6c3e39
Reviewed-on: http://gerrit.cloudera.org:8080/3834
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7ca72739
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7ca72739
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7ca72739

Branch: refs/heads/master
Commit: 7ca727396408b72e465ad2920beb85c6f8b01234
Parents: 7766e92
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Aug 2 14:57:00 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 3 22:58:41 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc | 97 +++++++++++++++++++++++++++++++++++++
 src/kudu/client/scan_batch.h   | 33 ++++++++++---
 2 files changed, 123 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7ca72739/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index e50a1c8..dd242d6 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -3067,5 +3067,102 @@ TEST_F(ClientTest, TestNoDefaultPartitioning) {
     ASSERT_STR_CONTAINS(s.ToString(), "Table partitioning must be specified");
 }
 
+TEST_F(ClientTest, TestBatchScanConstIterator) {
+  // Check for iterator behavior for an empty batch.
+  {
+    KuduScanBatch empty_batch;
+    ASSERT_EQ(0, empty_batch.NumRows());
+    ASSERT_TRUE(empty_batch.begin() == empty_batch.end());
+  }
+
+  {
+    // Insert a few rows
+    const int ROW_NUM = 2;
+    ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), ROW_NUM));
+
+    KuduScanner scanner(client_table_.get());
+    ASSERT_OK(scanner.Open());
+
+    // Do a scan
+    KuduScanBatch batch;
+    ASSERT_TRUE(scanner.HasMoreRows());
+    ASSERT_OK(scanner.NextBatch(&batch));
+    const int ref_count(batch.NumRows());
+    ASSERT_EQ(ROW_NUM, ref_count);
+
+    {
+      KuduScanBatch::const_iterator it_next = batch.begin();
+      std::advance(it_next, 1);
+
+      KuduScanBatch::const_iterator it = batch.begin();
+      ASSERT_TRUE(++it == it_next);
+      ASSERT_TRUE(it == it_next);
+    }
+
+    {
+      KuduScanBatch::const_iterator it_end = batch.begin();
+      std::advance(it_end, ROW_NUM);
+      ASSERT_TRUE(batch.end() == it_end);
+    }
+
+    {
+      KuduScanBatch::const_iterator it(batch.begin());
+      ASSERT_TRUE(it++ == batch.begin());
+
+      KuduScanBatch::const_iterator it_next(batch.begin());
+      ASSERT_TRUE(++it_next == it);
+    }
+
+    // Check the prefix increment iterator.
+    {
+      int count = 0;
+      for (KuduScanBatch::const_iterator it = batch.begin();
+           it != batch.end(); ++it) {
+          ++count;
+      }
+      CHECK_EQ(ref_count, count);
+    }
+
+    // Check the postfix increment iterator.
+    {
+      int count = 0;
+      for (KuduScanBatch::const_iterator it = batch.begin();
+           it != batch.end(); it++) {
+          ++count;
+      }
+      CHECK_EQ(ref_count, count);
+    }
+
+    {
+      KuduScanBatch::const_iterator it_pre(batch.begin());
+      KuduScanBatch::const_iterator it_post(batch.begin());
+      for (; it_pre != batch.end(); ++it_pre, it_post++) {
+          ASSERT_TRUE(it_pre == it_post);
+          ASSERT_FALSE(it_pre != it_post);
+      }
+    }
+
+    // Check that iterators which are going over different batches
+    // are different, even if they iterate over the same raw data.
+    {
+      KuduScanner other_scanner(client_table_.get());
+      ASSERT_OK(other_scanner.Open());
+
+      KuduScanBatch other_batch;
+      ASSERT_TRUE(other_scanner.HasMoreRows());
+      ASSERT_OK(other_scanner.NextBatch(&other_batch));
+      const int other_ref_count(other_batch.NumRows());
+      ASSERT_EQ(ROW_NUM, other_ref_count);
+
+      KuduScanBatch::const_iterator it(batch.begin());
+      KuduScanBatch::const_iterator other_it(other_batch.begin());
+      for (; it != batch.end(); ++it, ++other_it) {
+          ASSERT_FALSE(it == other_it);
+          ASSERT_TRUE(it != other_it);
+      }
+    }
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7ca72739/src/kudu/client/scan_batch.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_batch.h b/src/kudu/client/scan_batch.h
index 9ca593e..7808905 100644
--- a/src/kudu/client/scan_batch.h
+++ b/src/kudu/client/scan_batch.h
@@ -51,6 +51,14 @@ class KuduSchema;
 //
 // In C++03, you'll need to use a regular for loop:
 //
+//   for (KuduScanBatch::const_iterator it = batch.begin(), it != batch.end();
+//        ++i) {
+//     KuduScanBatch::RowPtr row(*it);
+//     ...
+//   }
+//
+//   or
+//
 //   for (int i = 0, num_rows = batch.NumRows();
 //        i < num_rows;
 //        i++) {
@@ -181,15 +189,26 @@ class KUDU_EXPORT KuduScanBatch::const_iterator
     return batch_->Row(idx_);
   }
 
-  void operator++() {
-    idx_++;
+  // Prefix increment operator: advances the iterator to the next position.
+  // Returns the reference to the incremented/advanced iterator.
+  const_iterator& operator++() {
+    ++idx_;
+    return *this;
+  }
+
+  // Postfix increment operator: advances the iterator to the next position.
+  // Returns a copy of the iterator pointing to the pre-incremented position.
+  const_iterator operator++(int) {
+    const_iterator tmp(batch_, idx_);
+    ++idx_;
+    return tmp;
   }
 
-  bool operator==(const const_iterator& other) {
-    return idx_ == other.idx_;
+  bool operator==(const const_iterator& other) const {
+    return (idx_ == other.idx_) && (batch_ == other.batch_);
   }
-  bool operator!=(const const_iterator& other) {
-    return idx_ != other.idx_;
+  bool operator!=(const const_iterator& other) const {
+    return !(*this == other);
   }
 
  private:
@@ -199,7 +218,7 @@ class KUDU_EXPORT KuduScanBatch::const_iterator
         idx_(idx) {
   }
 
-  const KuduScanBatch* batch_;
+  const KuduScanBatch* const batch_;
   int idx_;
 };