You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by dr...@apache.org on 2017/05/03 22:18:52 UTC

kudu git commit: Expose row format flags in KuduScanner

Repository: kudu
Updated Branches:
  refs/heads/master 899e6a5e5 -> 75333640b


Expose row format flags in KuduScanner

This adds a way to pass row format modifier flags to KuduScanner,
encoded in an int64_t as a bitset.

This API is marked as advanced and it's explicitely called
out that the user is responsible for knowing the row data format
that results from setting such flags and decoding the row data.

The only use for these flags, presently, is to set
PAD_UNIXTIME_MICROS_TO_16_BYTES, making sure that the server
pads slots for UNIXTIME_MICROS with an additional 8 bytes to
the left. In the future we might use these flags to (temporarily)
provide old/new row formats without having to change all the
row apis at once.

This adds a new test to all_types-itest that tests reading data
with the padding. The test scans a random projection to test
the padded column(s) in different positions.

Change-Id: I043b6514dc5fc307fc9c94eb41f3ae79796ba273
Reviewed-on: http://gerrit.cloudera.org:8080/6624
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/75333640
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/75333640
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/75333640

Branch: refs/heads/master
Commit: 75333640b6c07ee330450253bcb0787f6e9bc986
Parents: 899e6a5
Author: David Alves <dr...@apache.org>
Authored: Tue May 2 00:25:01 2017 -0700
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Wed May 3 22:17:47 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client.cc                     |  28 ++-
 src/kudu/client/client.h                      |  42 ++++
 src/kudu/client/scan_configuration.cc         |   8 +-
 src/kudu/client/scan_configuration.h          |   8 +
 src/kudu/client/scanner-internal.cc           |  22 +-
 src/kudu/client/scanner-internal.h            |   8 +
 src/kudu/integration-tests/all_types-itest.cc | 239 +++++++++++++++++----
 src/kudu/tools/tool_action_remote_replica.cc  |   1 +
 src/kudu/tserver/scanners-test.cc             |   9 +-
 src/kudu/tserver/scanners.cc                  |  13 +-
 src/kudu/tserver/scanners.h                   |  11 +-
 src/kudu/tserver/tablet_service.cc            |  50 ++++-
 src/kudu/tserver/tserver.proto                |  17 ++
 13 files changed, 387 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 87d4e55..b1bf502 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1260,6 +1260,21 @@ KuduSchema KuduScanner::GetProjectionSchema() const {
   return KuduSchema(*data_->configuration().projection());
 }
 
+Status KuduScanner::SetRowFormatFlags(uint64_t flags) {
+  switch (flags) {
+    case NO_FLAGS:
+    case PAD_UNIXTIME_MICROS_TO_16_BYTES:
+      break;
+    default:
+      return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags));
+  }
+  if (data_->open_) {
+    return Status::IllegalState("Row format flags must be set before Open()");
+  }
+
+  return data_->mutable_configuration()->SetRowFormatFlags(flags);
+}
+
 const ResourceMetrics& KuduScanner::GetResourceMetrics() const {
   return data_->resource_metrics_;
 }
@@ -1356,6 +1371,11 @@ bool KuduScanner::HasMoreRows() const {
 }
 
 Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
+  if (PREDICT_FALSE(data_->configuration().row_format_flags() != KuduScanner::NO_FLAGS)) {
+    return Status::IllegalState(
+        Substitute("Cannot extract rows. Row format modifier flags were selected: $0",
+                   data_->configuration().row_format_flags()));
+  }
   RETURN_NOT_OK(NextBatch(&data_->batch_for_old_api_));
   data_->batch_for_old_api_.data_->ExtractRows(rows);
   return Status::OK();
@@ -1382,8 +1402,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     return batch->data_->Reset(&data_->controller_,
                                data_->configuration().projection(),
                                data_->configuration().client_projection(),
-                                make_gscoped_ptr(data_->last_response_.release_data()));
-  } else if (data_->last_response_.has_more_results()) {
+                               data_->configuration().row_format_flags(),
+                               make_gscoped_ptr(data_->last_response_.release_data()));
+  }
+
+  if (data_->last_response_.has_more_results()) {
     // More data is available in this tablet.
     VLOG(2) << "Continuing " << data_->DebugString();
 
@@ -1403,6 +1426,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
         return batch->data_->Reset(&data_->controller_,
                                    data_->configuration().projection(),
                                    data_->configuration().client_projection(),
+                                   data_->configuration().row_format_flags(),
                                    make_gscoped_ptr(data_->last_response_.release_data()));
       }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a52c479..130b97d 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2024,6 +2024,48 @@ class KUDU_EXPORT KuduScanner {
   /// @return Schema of the projection being scanned.
   KuduSchema GetProjectionSchema() const;
 
+  /// @name Advanced/Unstable API
+  //
+  ///@{
+  /// Modifier flags for the row format returned from the server.
+  ///
+  /// @note Each flag corresponds to a bit that gets set on a bitset that is sent
+  ///   to the server. See SetRowFormatFlags() for example usage.
+  static const uint64_t NO_FLAGS = 0;
+  /// Makes the server pad UNIXTIME_MICROS slots to 16 bytes.
+  /// @note This flag actually wastes throughput by making messages larger than they need to
+  ///   be. It exists merely for compatibility reasons and requires the user to know the row
+  ///   format in order to decode the data. That is, if this flag is enabled, the user _must_
+  ///   use KuduScanBatch::direct_data() and KuduScanBatch::indirect_data() to obtain the row
+  ///   data for further decoding. Using KuduScanBatch::Row() might yield incorrect/corrupt
+  ///   results and might even cause the client to crash.
+  static const uint64_t PAD_UNIXTIME_MICROS_TO_16_BYTES = 1 << 0;
+  /// Optionally set row format modifier flags.
+  ///
+  /// If flags is RowFormatFlags::NO_FLAGS, then no modifications will be made to the row
+  /// format and the default will be used.
+  ///
+  /// Some flags require server-side server-side support, thus the caller should be prepared to
+  /// handle a NotSupported status in Open() and NextBatch().
+  ///
+  /// Example usage (without error handling, for brevity):
+  /// @code
+  ///   KuduScanner scanner(...);
+  ///   uint64_t row_format_flags = KuduScanner::NO_FLAGS;
+  ///   row_format_flags |= KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES;
+  ///   scanner.SetRowFormatFlags(row_format_flags);
+  ///   scanner.Open();
+  ///   while (scanner.HasMoreRows()) {
+  ///     KuduScanBatch batch;
+  ///     scanner.NextBatch(&batch);
+  ///     Slice direct_data = batch.direct_data();
+  ///     Slice indirect_data = batch.indirect_data();
+  ///     ... // Row data decoding and handling.
+  ///   }
+  /// @endcode
+  Status SetRowFormatFlags(uint64_t flags);
+  ///@}
+
   /// @return String representation of this scan.
   ///
   /// @internal

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/scan_configuration.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index f396a99..26fb605 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -46,7 +46,8 @@ ScanConfiguration::ScanConfiguration(KuduTable* table)
       is_fault_tolerant_(false),
       snapshot_timestamp_(kNoTimestamp),
       timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)),
-      arena_(1024, 1024 * 1024) {
+      arena_(1024, 1024 * 1024),
+      row_format_flags_(KuduScanner::NO_FLAGS) {
 }
 
 Status ScanConfiguration::SetProjectedColumnNames(const vector<string>& col_names) {
@@ -176,6 +177,11 @@ void ScanConfiguration::SetTimeoutMillis(int millis) {
   timeout_ = MonoDelta::FromMilliseconds(millis);
 }
 
+Status ScanConfiguration::SetRowFormatFlags(uint64_t flags) {
+  row_format_flags_ = flags;
+  return Status::OK();
+}
+
 void ScanConfiguration::OptimizeScanSpec() {
   spec_.OptimizeScan(*table_->schema().schema_,
                      &arena_,

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/scan_configuration.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
index dba2d2d..cae083f 100644
--- a/src/kudu/client/scan_configuration.h
+++ b/src/kudu/client/scan_configuration.h
@@ -75,6 +75,8 @@ class ScanConfiguration {
 
   void SetTimeoutMillis(int millis);
 
+  Status SetRowFormatFlags(uint64_t flags);
+
   void OptimizeScanSpec();
 
   const KuduTable& table() {
@@ -133,6 +135,10 @@ class ScanConfiguration {
     return timeout_;
   }
 
+  uint64_t row_format_flags() const {
+    return row_format_flags_;
+  }
+
   Arena* arena() {
     return &arena_;
   }
@@ -173,6 +179,8 @@ class ScanConfiguration {
   // Manages objects which need to live for the lifetime of the configuration,
   // such as schemas, predicates, and keys.
   AutoReleasePool pool_;
+
+  uint64_t row_format_flags_;
 };
 
 } // namespace client

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 8ca16b7..21bf8ed 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -243,6 +243,9 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
   if (!configuration_.spec().predicates().empty()) {
     controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
   }
+  if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
+    controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES);
+  }
   ScanRpcStatus scan_status = AnalyzeResponse(
       proxy_->Scan(next_req_,
                    &last_response_,
@@ -261,6 +264,7 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   PrepareRequest(KuduScanner::Data::NEW);
   next_req_.clear_scanner_id();
   NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
+  scan->set_row_format_flags(configuration_.row_format_flags());
   const KuduScanner::ReadMode read_mode = configuration_.read_mode();
   switch (read_mode) {
     case KuduScanner::READ_LATEST:
@@ -485,7 +489,7 @@ void KuduScanner::Data::UpdateLastError(const Status& error) {
 // KuduScanBatch
 ////////////////////////////////////////////////////////////
 
-KuduScanBatch::Data::Data() : projection_(NULL) {}
+KuduScanBatch::Data::Data() : projection_(NULL), row_format_flags_(KuduScanner::NO_FLAGS) {}
 
 KuduScanBatch::Data::~Data() {}
 
@@ -497,12 +501,14 @@ size_t KuduScanBatch::Data::CalculateProjectedRowSize(const Schema& proj) {
 Status KuduScanBatch::Data::Reset(RpcController* controller,
                                   const Schema* projection,
                                   const KuduSchema* client_projection,
-                                  gscoped_ptr<RowwiseRowBlockPB> data) {
+                                  uint64_t row_format_flags,
+                                  gscoped_ptr<RowwiseRowBlockPB> resp_data) {
   CHECK(controller->finished());
   controller_.Swap(controller);
   projection_ = projection;
   client_projection_ = client_projection;
-  resp_data_.Swap(data.get());
+  row_format_flags_ = row_format_flags;
+  resp_data_.Swap(resp_data.get());
 
   // First, rewrite the relative addresses into absolute ones.
   if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) {
@@ -524,12 +530,20 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
     }
   }
 
-  RETURN_NOT_OK(RewriteRowBlockPointers(*projection_, resp_data_, indirect_data_, &direct_data_));
+  bool pad_unixtime_micros_to_16_bytes = false;
+  if (row_format_flags_ & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
+    pad_unixtime_micros_to_16_bytes = true;
+  }
+
+  RETURN_NOT_OK(RewriteRowBlockPointers(*projection_, resp_data_, indirect_data_, &direct_data_,
+                                        pad_unixtime_micros_to_16_bytes));
   projected_row_size_ = CalculateProjectedRowSize(*projection_);
   return Status::OK();
 }
 
 void KuduScanBatch::Data::ExtractRows(vector<KuduScanBatch::RowPtr>* rows) {
+  DCHECK_EQ(row_format_flags_, KuduScanner::NO_FLAGS) << "Cannot extract rows. "
+      << "Row format modifier flags were selected: " << row_format_flags_;
   int n_rows = resp_data_.num_rows();
   rows->resize(n_rows);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 154ba42..e3f3418 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -260,6 +260,7 @@ class KuduScanBatch::Data {
   Status Reset(rpc::RpcController* controller,
                const Schema* projection,
                const KuduSchema* client_projection,
+               uint64_t row_format_flags,
                gscoped_ptr<RowwiseRowBlockPB> resp_data);
 
   int num_rows() const {
@@ -267,6 +268,9 @@ class KuduScanBatch::Data {
   }
 
   KuduRowResult row(int idx) {
+    DCHECK_EQ(row_format_flags_, KuduScanner::NO_FLAGS)
+        << "Cannot decode individual rows. Row format flags were set: "
+        << row_format_flags_;
     DCHECK_GE(idx, 0);
     DCHECK_LT(idx, num_rows());
     int offset = idx * projected_row_size_;
@@ -297,6 +301,10 @@ class KuduScanBatch::Data {
   // The KuduSchema version of 'projection_'
   const KuduSchema* client_projection_;
 
+  // The row format flags that were passed to the KuduScanner.
+  // See: KuduScanner::SetRowFormatFlags()
+  uint64_t row_format_flags_;
+
   // The number of bytes of direct data for each row.
   size_t projected_row_size_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/integration-tests/all_types-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/all_types-itest.cc b/src/kudu/integration-tests/all_types-itest.cc
index 50419dc..cb3147e 100644
--- a/src/kudu/integration-tests/all_types-itest.cc
+++ b/src/kudu/integration-tests/all_types-itest.cc
@@ -81,21 +81,29 @@ struct SliceKeysTestSetup {
                                                                                   row_key_slice);
   }
 
-  Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
+  Status VerifyRowKeySlice(Slice row_key_slice, int split_idx, int row_idx) const {
     int expected_row_key_num = (split_idx * increment_) + row_idx;
     string expected_row_key = StringPrintf("%08x", expected_row_key_num);
     Slice expected_row_key_slice(expected_row_key);
-    Slice row_key;
-    RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::type> >(0, &row_key));
-    if (expected_row_key_slice.compare(row_key) != 0) {
+    if (expected_row_key_slice.compare(row_key_slice) != 0) {
       return Status::Corruption(strings::Substitute("Keys didn't match. Expected: $0 Got: $1",
                                                     expected_row_key_slice.ToDebugString(),
-                                                    row_key.ToDebugString()));
+                                                    row_key_slice.ToDebugString()));
     }
-
     return Status::OK();
   }
 
+  Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
+    Slice row_key;
+    RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::type>>(0, &row_key));
+    return VerifyRowKeySlice(row_key, split_idx, row_idx);
+  }
+
+  Status VerifyRowKeyRaw(const uint8_t* raw_key, int split_idx, int row_idx) const {
+    Slice row_key = *reinterpret_cast<const Slice*>(raw_key);
+    return VerifyRowKeySlice(row_key, split_idx, row_idx);
+  }
+
   int GetRowsPerTablet() const {
     return rows_per_tablet_;
   }
@@ -154,9 +162,7 @@ struct IntKeysTestSetup {
     return insert->mutable_row()->Set<TypeTraits<KeyTypeWrapper::type> >(0, val);
   }
 
-  Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
-    CppType val;
-    RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::type> >(0, &val));
+  Status VerifyIntRowKey(CppType val, int split_idx, int row_idx) const {
     int expected = (split_idx * increment_) + row_idx;
     if (val != expected) {
       return Status::Corruption(strings::Substitute("Keys didn't match. Expected: $0 Got: $1",
@@ -165,6 +171,17 @@ struct IntKeysTestSetup {
     return Status::OK();
   }
 
+  Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
+    CppType val;
+    RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::type>>(0, &val));
+    return VerifyIntRowKey(val, split_idx, row_idx);
+  }
+
+  Status VerifyRowKeyRaw(const uint8_t* raw_key, int split_idx, int row_idx) const {
+    CppType val = *reinterpret_cast<const CppType*>(raw_key);
+    return VerifyIntRowKey(val, split_idx, row_idx);
+  }
+
   int GetRowsPerTablet() const {
     return rows_per_tablet_;
   }
@@ -184,6 +201,20 @@ struct IntKeysTestSetup {
   int rows_per_tablet_;
 };
 
+struct ExpectedVals {
+  int8_t expected_int8_val;
+  int16_t expected_int16_val;
+  int32_t expected_int32_val;
+  int64_t expected_int64_val;
+  int64_t expected_timestamp_val;
+  string slice_content;
+  Slice expected_slice_val;
+  Slice expected_binary_val;
+  bool expected_bool_val;
+  float expected_float_val;
+  double expected_double_val;
+};
+
 // Integration that writes, scans and verifies all types.
 template <class TestSetup>
 class AllTypesItest : public KuduTest {
@@ -192,6 +223,7 @@ class AllTypesItest : public KuduTest {
     if (AllowSlowTests()) {
       FLAGS_num_rows_per_tablet = 10000;
     }
+    SeedRandom();
     setup_ = TestSetup();
   }
 
@@ -315,53 +347,66 @@ class AllTypesItest : public KuduTest {
     projection->push_back("bool_val");
   }
 
+  ExpectedVals GetExpectedValsForRow(int split_idx, int row_idx) {
+    ExpectedVals vals;
+    int64_t expected_int_val = (split_idx * setup_.GetRowsPerTablet()) + row_idx;
+    vals.expected_int8_val = static_cast<int8_t>(expected_int_val);
+    vals.expected_int16_val = static_cast<int16_t>(expected_int_val);
+    vals.expected_int32_val = static_cast<int32_t>(expected_int_val);
+    vals.expected_int64_val = expected_int_val;
+    vals.expected_timestamp_val = expected_int_val;
+    vals.slice_content = strings::Substitute("hello $0", expected_int_val);
+    vals.expected_slice_val = Slice(vals.slice_content);
+    vals.expected_binary_val = Slice(vals.slice_content);
+    vals.expected_bool_val = expected_int_val % 2;
+    vals.expected_float_val = expected_int_val;
+    vals.expected_double_val = expected_int_val;
+    return vals;
+  }
+
   void VerifyRow(const KuduRowResult& row, int split_idx, int row_idx) {
     ASSERT_OK(setup_.VerifyRowKey(row, split_idx, row_idx));
 
-    int64_t expected_int_val = (split_idx * setup_.GetRowsPerTablet()) + row_idx;
+    ExpectedVals vals = GetExpectedValsForRow(split_idx, row_idx);
+
     int8_t int8_val;
     ASSERT_OK(row.GetInt8("int8_val", &int8_val));
-    ASSERT_EQ(int8_val, static_cast<int8_t>(expected_int_val));
+    ASSERT_EQ(int8_val, vals.expected_int8_val);
     int16_t int16_val;
     ASSERT_OK(row.GetInt16("int16_val", &int16_val));
-    ASSERT_EQ(int16_val, static_cast<int16_t>(expected_int_val));
+    ASSERT_EQ(int16_val, vals.expected_int16_val);
     int32_t int32_val;
     ASSERT_OK(row.GetInt32("int32_val", &int32_val));
-    ASSERT_EQ(int32_val, static_cast<int32_t>(expected_int_val));
+    ASSERT_EQ(int32_val, vals.expected_int32_val);
     int64_t int64_val;
     ASSERT_OK(row.GetInt64("int64_val", &int64_val));
-    ASSERT_EQ(int64_val, expected_int_val);
+    ASSERT_EQ(int64_val, vals.expected_int64_val);
     int64_t timestamp_val;
     ASSERT_OK(row.GetUnixTimeMicros("timestamp_val", &timestamp_val));
-    ASSERT_EQ(timestamp_val, expected_int_val);
-
-    string content = strings::Substitute("hello $0", expected_int_val);
-    Slice expected_slice_val(content);
+    ASSERT_EQ(timestamp_val, vals.expected_timestamp_val);
     Slice string_val;
     ASSERT_OK(row.GetString("string_val", &string_val));
-    ASSERT_EQ(string_val, expected_slice_val);
+    ASSERT_EQ(string_val, vals.expected_slice_val);
     Slice binary_val;
     ASSERT_OK(row.GetBinary("binary_val", &binary_val));
-    ASSERT_EQ(binary_val, expected_slice_val);
-
-    bool expected_bool_val = expected_int_val % 2;
+    ASSERT_EQ(binary_val, vals.expected_binary_val);
     bool bool_val;
     ASSERT_OK(row.GetBool("bool_val", &bool_val));
-    ASSERT_EQ(bool_val, expected_bool_val);
-
-    double expected_double_val = expected_int_val;
+    ASSERT_EQ(bool_val, vals.expected_bool_val);
     double double_val;
     ASSERT_OK(row.GetDouble("double_val", &double_val));
-    ASSERT_EQ(double_val, expected_double_val);
+    ASSERT_EQ(double_val, vals.expected_double_val);
     float float_val;
     ASSERT_OK(row.GetFloat("float_val", &float_val));
-    ASSERT_EQ(float_val, static_cast<float>(double_val));
+    ASSERT_EQ(float_val, vals.expected_float_val);
   }
 
-  Status VerifyRows() {
-    vector<string> projection;
-    SetupProjection(&projection);
+  typedef std::function<Status (KuduScanner* scanner)> ScannerSetup;
+  typedef std::function<void (const KuduScanBatch& batch,
+                              int num_tablet,
+                              int* total_rows_in_tablet)> RowVerifier;
 
+  Status VerifyRows(const ScannerSetup& scanner_setup, const RowVerifier& verifier) {
     int total_rows = 0;
     // Scan a single tablet and make sure it has the rows we expect in the amount we
     // expect.
@@ -380,23 +425,19 @@ class AllTypesItest : public KuduTest {
         high_split = split.ToString();
       }
 
-      RETURN_NOT_OK(scanner.SetProjectedColumns(projection));
+      RETURN_NOT_OK(scanner_setup(&scanner));
       RETURN_NOT_OK(scanner.SetBatchSizeBytes(KMaxBatchSize));
       RETURN_NOT_OK(scanner.SetFaultTolerant());
       RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+
       RETURN_NOT_OK(scanner.Open());
       LOG(INFO) << "Scanning tablet: [" << low_split << ", " << high_split << ")";
 
       int total_rows_in_tablet = 0;
       while (scanner.HasMoreRows()) {
-        vector<KuduRowResult> rows;
-        RETURN_NOT_OK(scanner.NextBatch(&rows));
-
-        for (int j = 0; j < rows.size(); ++j) {
-          VLOG(1) << "Scanned row: " << rows[j].ToString();
-          VerifyRow(rows[j], i, total_rows_in_tablet + j);
-        }
-        total_rows_in_tablet += rows.size();
+        KuduScanBatch batch;
+        scanner.NextBatch(&batch);
+        verifier(batch, i, &total_rows_in_tablet);
       }
       CHECK_EQ(total_rows_in_tablet, setup_.GetRowsPerTablet());
       total_rows += total_rows_in_tablet;
@@ -405,7 +446,7 @@ class AllTypesItest : public KuduTest {
     return Status::OK();
   }
 
-  void RunTest() {
+  void RunTest(const ScannerSetup& scanner_setup, const RowVerifier& verifier) {
     ASSERT_OK(CreateCluster());
     ASSERT_OK(CreateTable());
     ASSERT_OK(InsertRows());
@@ -414,7 +455,7 @@ class AllTypesItest : public KuduTest {
     // Verify always passes.
     NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
     // Check that the inserted data matches what we thought we inserted.
-    ASSERT_OK(VerifyRows());
+    ASSERT_OK(VerifyRows(scanner_setup, verifier));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -450,9 +491,121 @@ typedef ::testing::Types<IntKeysTestSetup<KeyTypeWrapper<INT8> >,
 TYPED_TEST_CASE(AllTypesItest, KeyTypes);
 
 TYPED_TEST(AllTypesItest, TestAllKeyTypes) {
-  this->RunTest();
+  vector<string> projection;
+  this->SetupProjection(&projection);
+  auto scanner_setup = [&](KuduScanner* scanner) {
+    return scanner->SetProjectedColumnNames(projection);
+  };
+  auto row_verifier = [&](const KuduScanBatch& batch, int num_tablet, int* total_rows_in_tablet) {
+    for (int i = 0; i < batch.NumRows(); i++) {
+      NO_FATALS(this->VerifyRow(batch.Row(i), num_tablet, *total_rows_in_tablet + i));
+    }
+    *total_rows_in_tablet += batch.NumRows();
+  };
+
+  this->RunTest(scanner_setup, row_verifier);
+}
+
+TYPED_TEST(AllTypesItest, TestTimestampPadding) {
+  vector<string> projection;
+  this->SetupProjection(&projection);
+  auto scanner_setup = [&](KuduScanner* scanner) -> Status {
+    // Each time this function is called we shuffle the projection to get the chance
+    // of having timestamps in different places of the projection and before/after
+    // different types.
+    std::random_shuffle(projection.begin(), projection.end());
+    RETURN_NOT_OK(scanner->SetProjectedColumnNames(projection));
+    int row_format_flags = KuduScanner::NO_FLAGS;
+    row_format_flags |= KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES;
+    return scanner->SetRowFormatFlags(row_format_flags);
+  };
+
+  auto row_verifier = [&](const KuduScanBatch& batch, int num_tablet, int* total_rows_in_tablet) {
+    // Timestamps are padded to 16 bytes.
+    int kPaddedTimestampSize = 16;
+
+    // Calculate the projection size, each of the column offsets and the size of the null bitmap.
+    const KuduSchema* schema = batch.projection_schema();
+    vector<int> projection_offsets;
+    int row_stride = 0;
+    int num_nullable_cols = 0;
+    for (int i = 0; i < schema->num_columns(); i++) {
+      KuduColumnSchema col_schema = schema->Column(i);
+      if (col_schema.is_nullable()) num_nullable_cols++;
+      switch (col_schema.type()) {
+        case KuduColumnSchema::UNIXTIME_MICROS:
+          projection_offsets.push_back(kPaddedTimestampSize);
+          row_stride += kPaddedTimestampSize;
+          break;
+        default:
+          int col_size = GetTypeInfo(ToInternalDataType(col_schema.type()))->size();
+          projection_offsets.push_back(col_size);
+          row_stride += col_size;
+      }
+    }
+
+    int null_bitmap_size = BitmapSize(num_nullable_cols);
+    row_stride += null_bitmap_size;
+
+    Slice direct_data = batch.direct_data();
+
+    ASSERT_EQ(direct_data.size(), row_stride * batch.NumRows());
+
+    const uint8_t* row_data = direct_data.data();
+
+    for (int i = 0; i < batch.NumRows(); i++) {
+      for (int j = 0; j < schema->num_columns(); j++) {
+        KuduColumnSchema col_schema = schema->Column(j);
+
+        if (schema->Column(j).name() == "key") {
+          ASSERT_OK(this->setup_.VerifyRowKeyRaw(row_data, num_tablet, *total_rows_in_tablet + i));
+        } else {
+          ExpectedVals vals = this->GetExpectedValsForRow(num_tablet, *total_rows_in_tablet + i);
+
+          switch (col_schema.type()) {
+            case KuduColumnSchema::INT8:
+              ASSERT_EQ(*reinterpret_cast<const int8_t*>(row_data), vals.expected_int8_val);
+              break;
+            case KuduColumnSchema::INT16:
+              ASSERT_EQ(*reinterpret_cast<const int16_t*>(row_data), vals.expected_int16_val);
+              break;
+            case KuduColumnSchema::INT32:
+              ASSERT_EQ(*reinterpret_cast<const int32_t*>(row_data), vals.expected_int32_val);
+              break;
+            case KuduColumnSchema::INT64:
+              ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data), vals.expected_int64_val);
+              break;
+            case KuduColumnSchema::UNIXTIME_MICROS:
+              ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data), vals.expected_timestamp_val);
+              break;
+            case KuduColumnSchema::STRING:
+              ASSERT_EQ(*reinterpret_cast<const Slice*>(row_data), vals.expected_slice_val);
+              break;
+            case KuduColumnSchema::BINARY:
+              ASSERT_EQ(*reinterpret_cast<const Slice*>(row_data), vals.expected_binary_val);
+              break;
+            case KuduColumnSchema::BOOL:
+              ASSERT_EQ(*reinterpret_cast<const bool*>(row_data), vals.expected_bool_val);
+              break;
+            case KuduColumnSchema::FLOAT:
+              ASSERT_EQ(*reinterpret_cast<const float*>(row_data), vals.expected_float_val);
+              break;
+            case KuduColumnSchema::DOUBLE:
+              ASSERT_EQ(*reinterpret_cast<const double*>(row_data), vals.expected_double_val);
+              break;
+            default:
+              LOG(FATAL) << "Unexpected type: " << col_schema.type();
+          }
+        }
+        row_data += projection_offsets[j];
+      }
+      row_data += null_bitmap_size;
+    }
+    *total_rows_in_tablet += batch.NumRows();
+  };
+
+  this->RunTest(scanner_setup, row_verifier);
 }
 
 } // namespace client
 } // namespace kudu
-

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tools/tool_action_remote_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index 9eb1674..0eee76e 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -138,6 +138,7 @@ class ReplicaDumper {
       RETURN_NOT_OK(results.Reset(&rpc,
                                   &schema,
                                   &client_schema,
+                                  client::KuduScanner::NO_FLAGS,
                                   make_gscoped_ptr(resp.release_data())));
       vector<KuduRowResult> rows;
       results.ExtractRows(&rows);

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tserver/scanners-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners-test.cc b/src/kudu/tserver/scanners-test.cc
index 68cfe4c..5541bc3 100644
--- a/src/kudu/tserver/scanners-test.cc
+++ b/src/kudu/tserver/scanners-test.cc
@@ -21,6 +21,7 @@
 #include <gtest/gtest.h>
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/scanner_metrics.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/test_util.h"
 
@@ -40,8 +41,8 @@ TEST(ScannersTest, TestManager) {
 
   // Create two scanners, make sure their ids are different.
   SharedScanner s1, s2;
-  mgr.NewScanner(null_peer, "", &s1);
-  mgr.NewScanner(null_peer, "", &s2);
+  mgr.NewScanner(null_peer, "", RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_peer, "", RowFormatFlags::NO_FLAGS, &s2);
   ASSERT_NE(s1->id(), s2->id());
 
   // Check that they're both registered.
@@ -69,8 +70,8 @@ TEST(ScannerTest, TestExpire) {
   MetricRegistry registry;
   ScannerManager mgr(METRIC_ENTITY_server.Instantiate(&registry, "test"));
   SharedScanner s1, s2;
-  mgr.NewScanner(null_peer, "", &s1);
-  mgr.NewScanner(null_peer, "", &s2);
+  mgr.NewScanner(null_peer, "", RowFormatFlags::NO_FLAGS, &s1);
+  mgr.NewScanner(null_peer, "", RowFormatFlags::NO_FLAGS, &s2);
   SleepFor(MonoDelta::FromMilliseconds(200));
   s2->UpdateAccessTime();
   mgr.RemoveExpiredScanners();

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 43d8144..ec0f77c 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -108,6 +108,7 @@ ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const str
 
 void ScannerManager::NewScanner(const scoped_refptr<TabletPeer>& tablet_peer,
                                 const std::string& requestor_string,
+                                uint64 row_format_flags,
                                 SharedScanner* scanner) {
   // Keep trying to generate a unique ID until we get one.
   bool success = false;
@@ -117,7 +118,11 @@ void ScannerManager::NewScanner(const scoped_refptr<TabletPeer>& tablet_peer,
     // just retry until we avoid a collision. Alternatively we could
     // verify that the requestor userid does not change mid-scan.
     string id = oid_generator_.Next();
-    scanner->reset(new Scanner(id, tablet_peer, requestor_string, metrics_.get()));
+    scanner->reset(new Scanner(id,
+                               tablet_peer,
+                               requestor_string,
+                               metrics_.get(),
+                               row_format_flags));
 
     ScannerMapStripe& stripe = GetStripeByScannerId(id);
     std::lock_guard<RWMutex> l(stripe.lock_);
@@ -188,14 +193,16 @@ void ScannerManager::RemoveExpiredScanners() {
 const std::string Scanner::kNullTabletId = "null tablet";
 
 Scanner::Scanner(string id, const scoped_refptr<TabletPeer>& tablet_peer,
-                 string requestor_string, ScannerMetrics* metrics)
+                 string requestor_string, ScannerMetrics* metrics,
+                 uint64_t row_format_flags)
     : id_(std::move(id)),
       tablet_peer_(tablet_peer),
       requestor_string_(std::move(requestor_string)),
       call_seq_id_(0),
       start_time_(MonoTime::Now()),
       metrics_(metrics),
-      arena_(1024, 1024 * 1024) {
+      arena_(1024, 1024 * 1024),
+      row_format_flags_(row_format_flags) {
   UpdateAccessTime();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tserver/scanners.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 370d5bc..1204860 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -73,6 +73,7 @@ class ScannerManager {
   // Create a new scanner with a unique ID, inserting it into the map.
   void NewScanner(const scoped_refptr<tablet::TabletPeer>& tablet_peer,
                   const std::string& requestor_string,
+                  uint64_t row_format_flags,
                   SharedScanner* scanner);
 
   // Lookup the given scanner by its ID.
@@ -169,7 +170,8 @@ class Scanner {
  public:
   Scanner(std::string id,
           const scoped_refptr<tablet::TabletPeer>& tablet_peer,
-          std::string requestor_string, ScannerMetrics* metrics);
+          std::string requestor_string, ScannerMetrics* metrics,
+          uint64_t row_format_flags);
   ~Scanner();
 
   // Attach an actual iterator and a ScanSpec to this Scanner.
@@ -272,6 +274,10 @@ class Scanner {
     already_reported_stats_ = stats;
   }
 
+  uint64_t row_format_flags() const {
+    return row_format_flags_;
+  }
+
  private:
   friend class ScannerManager;
 
@@ -323,6 +329,9 @@ class Scanner {
   // response.
   Arena arena_;
 
+  // The row format flags the client passed, if any.
+  const uint64_t row_format_flags_;
+
   DISALLOW_COPY_AND_ASSIGN(Scanner);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 3fa9abb..f8a0f4f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -337,6 +337,15 @@ class ScanResultCollector {
 
   // Return the number of rows actually returned to the client.
   virtual int64_t NumRowsReturned() const = 0;
+
+  // Sets row format flags on the ScanResultCollector.
+  //
+  // This is a setter instead of a constructor argument passed to specific
+  // collector implementations because, currently, the collector is built
+  // before the request is decoded and checked for 'row_format_flags'.
+  //
+  // Does nothing by default.
+  virtual void set_row_format_flags(uint64_t /* row_format_flags */) {}
 };
 
 namespace {
@@ -370,38 +379,46 @@ void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
 // server-side scan and thus never need to return the actual data.)
 class ScanResultCopier : public ScanResultCollector {
  public:
-  ScanResultCopier(RowwiseRowBlockPB* rowblock_pb, faststring* rows_data, faststring* indirect_data)
+  ScanResultCopier(RowwiseRowBlockPB* rowblock_pb,
+                   faststring* rows_data,
+                   faststring* indirect_data)
       : rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)),
         rows_data_(DCHECK_NOTNULL(rows_data)),
         indirect_data_(DCHECK_NOTNULL(indirect_data)),
         blocks_processed_(0),
-        num_rows_returned_(0) {
-  }
+        num_rows_returned_(0),
+        pad_unixtime_micros_to_16_bytes_(false) {}
 
-  virtual void HandleRowBlock(const Schema* client_projection_schema,
-                              const RowBlock& row_block) OVERRIDE {
+  void HandleRowBlock(const Schema* client_projection_schema,
+                              const RowBlock& row_block) override {
     blocks_processed_++;
     num_rows_returned_ += row_block.selection_vector()->CountSelected();
     SerializeRowBlock(row_block, rowblock_pb_, client_projection_schema,
-                      rows_data_, indirect_data_);
+                      rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
     SetLastRow(row_block, &last_primary_key_);
   }
 
-  virtual int BlocksProcessed() const OVERRIDE { return blocks_processed_; }
+  int BlocksProcessed() const override { return blocks_processed_; }
 
   // Returns number of bytes buffered to return.
-  virtual int64_t ResponseSize() const OVERRIDE {
+  int64_t ResponseSize() const override {
     return rows_data_->size() + indirect_data_->size();
   }
 
-  virtual const faststring& last_primary_key() const OVERRIDE {
+  const faststring& last_primary_key() const override {
     return last_primary_key_;
   }
 
-  virtual int64_t NumRowsReturned() const OVERRIDE {
+  int64_t NumRowsReturned() const override {
     return num_rows_returned_;
   }
 
+  void set_row_format_flags(uint64_t row_format_flags) override {
+    if (row_format_flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
+      pad_unixtime_micros_to_16_bytes_ = true;
+    }
+  }
+
  private:
   RowwiseRowBlockPB* const rowblock_pb_;
   faststring* const rows_data_;
@@ -409,6 +426,7 @@ class ScanResultCopier : public ScanResultCollector {
   int blocks_processed_;
   int64_t num_rows_returned_;
   faststring last_primary_key_;
+  bool pad_unixtime_micros_to_16_bytes_;
 
   DISALLOW_COPY_AND_ASSIGN(ScanResultCopier);
 };
@@ -1264,7 +1282,13 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
 }
 
 bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
-  return feature == TabletServerFeatures::COLUMN_PREDICATES;
+  switch (feature) {
+    case TabletServerFeatures::COLUMN_PREDICATES:
+    case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
+      return true;
+    default:
+      return false;
+  }
 }
 
 void TabletServiceImpl::Shutdown() {
@@ -1475,6 +1499,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(tablet_peer,
                                          rpc_context->requestor_string(),
+                                         scan_pb.row_format_flags(),
                                          &scanner);
 
   // If we early-exit out of this function, automatically unregister
@@ -1690,6 +1715,9 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
     }
   }
 
+  // Set the row format flags on the ScanResultCollector.
+  result_collector->set_row_format_flags(scanner->row_format_flags());
+
   // If we early-exit out of this function, automatically unregister the scanner.
   ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75333640/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 2fc8184..06fa96b 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -207,6 +207,11 @@ message ColumnRangePredicateListPB {
   repeated ColumnRangePredicatePB range_predicates = 1;
 }
 
+enum RowFormatFlags {
+  NO_FLAGS = 0;
+  PAD_UNIX_TIME_MICROS_TO_16_BYTES = 1;
+}
+
 message NewScanRequestPB {
   // The tablet to scan.
   required bytes tablet_id = 1;
@@ -261,6 +266,16 @@ message NewScanRequestPB {
   // attempt. If set, this will take precedence over the `start_primary_key`
   // field, and functions as an exclusive start primary key.
   optional bytes last_primary_key = 12 [(kudu.REDACT) = true];
+
+  // Row format flags.
+  //
+  // The client may pass "row format modifier" flags that change the way the server encodes
+  // the returned row data in some way. Only on/off modifiers are supported, which are encoded
+  // as a bitset in this uint64.
+  //
+  // The default value corresponds to RowFormatFlags::NO_FLAGS, which can't be set
+  // as the actual default since the types differ.
+  optional uint64 row_format_flags = 14 [default = 0];
 }
 
 // A scan request. Initially, it should specify a scan. Later on, you
@@ -368,4 +383,6 @@ message ScannerKeepAliveResponsePB {
 enum TabletServerFeatures {
   UNKNOWN_FEATURE = 0;
   COLUMN_PREDICATES = 1;
+  // Whether the server supports padding UNIXTIME_MICROS slots to 16 bytes.
+  PAD_UNIXTIME_MICROS_TO_16_BYTES = 2;
 }