You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/11/11 22:13:20 UTC

[impala] branch master updated: IMPALA-11537: Query option validation for numeric types

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a506c65 IMPALA-11537: Query option validation for numeric types
97a506c65 is described below

commit 97a506c6560e7a0c6c5ba7d098f07c6b04ef6cad
Author: Peter Rozsa <pr...@cloudera.com>
AuthorDate: Thu Sep 22 16:24:27 2022 +0200

    IMPALA-11537: Query option validation for numeric types
    
    This change adds a more generic approach to validate numeric
    query options and report parse and validation errors.
    Supported types: integers, floats, memory specifications.
    Range and bound validator helper functions are added to make
    validation unified on call sites.
    
    Testing:
     - Error messages got more generic, therefore the existing tests
       around query options are aligned to match them
    
    Change-Id: Ia7757b52393c094d2c661918d73cbfad7214f855
    Reviewed-on: http://gerrit.cloudera.org:8080/19096
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-option-parser.h               | 223 +++++
 be/src/service/query-options.cc                    | 927 ++++++++-------------
 be/src/util/parse-util.h                           |  21 +
 common/thrift/generate_error_codes.py              |   2 +
 .../functional-query/queries/QueryTest/set.test    |  10 +-
 5 files changed, 615 insertions(+), 568 deletions(-)

diff --git a/be/src/service/query-option-parser.h b/be/src/service/query-option-parser.h
new file mode 100644
index 000000000..b87246668
--- /dev/null
+++ b/be/src/service/query-option-parser.h
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_QUERY_OPTION_PARSER_H
+#define IMPALA_SERVICE_QUERY_OPTION_PARSER_H
+
+#include <functional>
+#include <sstream>
+#include <string>
+#include <type_traits>
+
+#include "common/status.h"
+#include "gen-cpp/ImpalaService_types.h"
+#include "gutil/strings/strip.h"
+#include "util/mem-info.h"
+#include "util/parse-util.h"
+#include "util/string-parser.h"
+
+namespace impala {
+
+template <typename T>
+class QueryOptionValidator {
+  static_assert(std::is_arithmetic<T>::value || std::is_same<T, MemSpec>::value,
+      "Not supported type");
+
+ public:
+  static inline Status InclusiveRange(
+      TImpalaQueryOptions::type option, const T value, const T lower, const T upper) {
+    if (value < lower || value > upper) {
+      std::stringstream ss;
+      ss << value << " is not in range [" << lower << ", " << upper << "]";
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+  static inline Status ExclusiveRange(
+      TImpalaQueryOptions::type option, const T value, const T lower, const T upper) {
+    if (value <= lower || value >= upper) {
+      std::stringstream ss;
+      ss << value << " is not in range (" << lower << ", " << upper << ")";
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+  static inline Status InclusiveLowerBound(
+      TImpalaQueryOptions::type option, const T value, const T lower) {
+    if (value < lower) {
+      std::stringstream ss;
+      ss << "Value must be greater than or equal to " << lower
+         << ", actual value: " << value;
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+  static inline Status NotEquals(
+      TImpalaQueryOptions::type option, const T value, const T other) {
+    if (value == other) {
+      std::stringstream ss;
+      ss << "Value can't be " << other << ", actual value: " << value;
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+  static inline Status NonNegative(TImpalaQueryOptions::type option, const T value) {
+    if (value < T{0}) {
+      std::stringstream ss;
+      ss << "Value must be non-negative: " << value;
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+  static inline Status PowerOf2(TImpalaQueryOptions::type option, const T value) {
+    // Converting accepted types to int64_t to make this function applicable to MemSpec
+    int64_t int_value = ConvertToInt(value);
+    if (!impala::BitUtil::IsPowerOf2(int_value)) {
+      std::stringstream ss;
+      ss << "Value must be a power of two: " << value;
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
+ private:
+  // Converter to make PowerOf2 usable to all accepted types, this identity function
+  // maps all arithmetic values to themselves
+  static int64_t ConvertToInt(int64_t value) { return value; }
+  // This overload extracts out the numeric value from MemSpec
+  static int64_t ConvertToInt(const MemSpec& mem_spec) { return mem_spec.value; }
+
+  static inline Status CreateValidationErrorStatus(
+      TImpalaQueryOptions::type option, const std::string& error_message) {
+    return Status(strings::Substitute("Invalid value for query option $0: $1",
+        _TImpalaQueryOptions_VALUES_TO_NAMES.at(option), error_message));
+  }
+};
+
+template <typename T>
+struct ParseResult {
+  const T value;
+  bool is_parse_ok;
+};
+
+class QueryOptionParser {
+ public:
+  template <typename T>
+  static Status Parse(
+      TImpalaQueryOptions::type option, const std::string& value, T* result) {
+    ParseResult<T> parse_result = ParseInternal<T>(value);
+    if (!parse_result.is_parse_ok) {
+      return CreateParseErrorStatus(option, value);
+    }
+    *result = parse_result.value;
+    return Status::OK();
+  }
+
+  template <typename T>
+  static Status ParseAndCheckInclusiveRange(TImpalaQueryOptions::type option,
+      const std::string& value, const T lower, const T upper, T* result) {
+    Status status = Parse(option, value, result);
+    RETURN_IF_ERROR(status);
+    return QueryOptionValidator<T>::InclusiveRange(option, *result, lower, upper);
+  }
+
+  template <typename T>
+  static Status ParseAndCheckExclusiveRange(TImpalaQueryOptions::type option,
+      const std::string& value, const T lower, const T upper, T* result) {
+    Status status = Parse(option, value, result);
+    RETURN_IF_ERROR(status);
+    return QueryOptionValidator<T>::ExclusiveRange(option, *result, lower, upper);
+  }
+
+  template <typename T>
+  static Status ParseAndCheckInclusiveLowerBound(TImpalaQueryOptions::type option,
+      const std::string& value, const T lower, T* result) {
+    Status status = Parse(option, value, result);
+    RETURN_IF_ERROR(status);
+    return QueryOptionValidator<T>::InclusiveLowerBound(option, *result, lower);
+  }
+
+  template <typename T>
+  static Status ParseAndCheckNonNegative(
+      TImpalaQueryOptions::type option, const std::string& value, T* result) {
+    Status status = Parse(option, value, result);
+    RETURN_IF_ERROR(status);
+    return QueryOptionValidator<T>::NonNegative(option, *result);
+  }
+
+ private:
+  static Status CreateParseErrorStatus(
+      TImpalaQueryOptions::type option, const std::string& value) {
+    return Status(TErrorCode::QUERY_OPTION_PARSE_FAILED,
+        _TImpalaQueryOptions_VALUES_TO_NAMES.at(option), value);
+  }
+
+  template <typename Integral,
+      std::enable_if_t<std::is_integral<Integral>::value, Integral>* = nullptr>
+  static ParseResult<Integral> ParseInternal(const std::string& value) {
+    StringParser::ParseResult parse_result;
+    const auto result =
+        StringParser::StringToInt<Integral>(value.c_str(), value.size(), &parse_result);
+
+    return {result, parse_result == StringParser::PARSE_SUCCESS};
+  }
+
+  template <typename Float,
+      std::enable_if_t<std::is_floating_point<Float>::value, Float>* = nullptr>
+  static ParseResult<Float> ParseInternal(const std::string& value) {
+    StringParser::ParseResult parse_result;
+    const auto result =
+        StringParser::StringToFloat<Float>(value.c_str(), value.size(), &parse_result);
+
+    return {result, parse_result == StringParser::PARSE_SUCCESS};
+  }
+
+  static ParseResult<MemSpec> ParseMemSpecInternal(const std::string& value) {
+    bool is_percent = false;
+    const int64_t mem_spec =
+        ParseUtil::ParseMemSpec(value, &is_percent, MemInfo::physical_mem());
+
+    bool is_parse_ok = mem_spec >= 0;
+
+    // Percent values are not allowed for query options
+    if (is_percent) {
+      is_parse_ok = false;
+    }
+
+    return {{mem_spec, true}, is_parse_ok};
+  }
+};
+
+template <>
+Status QueryOptionParser::Parse(
+    TImpalaQueryOptions::type option, const std::string& value, MemSpec* result) {
+  ParseResult<MemSpec> parse_result = ParseMemSpecInternal(value);
+  if (!parse_result.is_parse_ok) {
+    return CreateParseErrorStatus(option, value);
+  }
+  *result = parse_result.value;
+  return Status::OK();
+}
+
+} // namespace impala
+
+#endif
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index fc31006e9..a78a920ab 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -17,54 +17,39 @@
 
 #include "service/query-options.h"
 
-#include "runtime/runtime-filter.h"
-#include "util/debug-util.h"
-#include "util/mem-info.h"
-#include "util/parse-util.h"
-#include "util/string-parser.h"
-#include "exprs/timezone_db.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
+#include <limits>
 #include <sstream>
+
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/strip.h>
 #include <gutil/strings/substitute.h>
 
-#include "common/names.h"
+#include "exprs/timezone_db.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "runtime/runtime-filter.h"
+#include "service/query-option-parser.h"
+#include "util/debug-util.h"
+#include "util/parse-util.h"
 
 DECLARE_int64(min_buffer_size);
 
+using beeswax::TQueryOptionLevel;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
-using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
 using boost::algorithm::trim;
 using std::to_string;
-using beeswax::TQueryOptionLevel;
 using namespace impala;
 using namespace strings;
 
 DECLARE_int32(idle_session_timeout);
 
-// Utility method to wrap ParseUtil::ParseMemSpec() by returning a Status instead of an
-// int.
-static Status ParseMemValue(const string& value, const string& key, int64_t* result) {
-  bool is_percent;
-  *result = ParseUtil::ParseMemSpec(value, &is_percent, MemInfo::physical_mem());
-  if (*result < 0) {
-    return Status("Failed to parse " + key + " from '" + value + "'.");
-  }
-  if (is_percent) {
-    return Status("Invalid " + key + " with percent '" + value + "'.");
-  }
-  return Status::OK();
-}
-
-void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask,
-    TQueryOptions* dst) {
-  DCHECK_GT(mask.size(), _TImpalaQueryOptions_VALUES_TO_NAMES.size()) <<
-      "Size of QueryOptionsMask must be increased.";
-#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
+void impala::OverlayQueryOptions(
+    const TQueryOptions& src, const QueryOptionsMask& mask, TQueryOptions* dst) {
+  DCHECK_GT(mask.size(), _TImpalaQueryOptions_VALUES_TO_NAMES.size())
+      << "Size of QueryOptionsMask must be increased.";
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL) \
   if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->__set_##NAME(src.NAME);
 #define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
@@ -83,7 +68,7 @@ string PrintQueryOptionValue(const T& option) {
   return std::to_string(option);
 }
 
-const string& PrintQueryOptionValue(const std::string& option)  {
+const string& PrintQueryOptionValue(const std::string& option) {
   return option;
 }
 
@@ -96,8 +81,8 @@ const string PrintQueryOptionValue(const impala::TCompressionCodec& compression_
   }
 }
 
-std::ostream& impala::operator<<(std::ostream& out,
-    const std::set<impala::TRuntimeFilterType::type>& filter_types) {
+std::ostream& impala::operator<<(
+    std::ostream& out, const std::set<impala::TRuntimeFilterType::type>& filter_types) {
   bool first = true;
   for (const auto& t : filter_types) {
     if (!first) out << ",";
@@ -108,21 +93,21 @@ std::ostream& impala::operator<<(std::ostream& out,
 }
 
 const string PrintQueryOptionValue(
-    const set<impala::TRuntimeFilterType::type>& filter_types) {
-  stringstream val;
+    const std::set<impala::TRuntimeFilterType::type>& filter_types) {
+  std::stringstream val;
   val << filter_types;
   return val.str();
 }
 
-void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
-    map<string, string>* configuration) {
-#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
-  {\
-    if (query_options.__isset.NAME) { \
+void impala::TQueryOptionsToMap(
+    const TQueryOptions& query_options, std::map<string, string>* configuration) {
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL)                                    \
+  {                                                                        \
+    if (query_options.__isset.NAME) {                                      \
       (*configuration)[#ENUM] = PrintQueryOptionValue(query_options.NAME); \
-    } else { \
-      (*configuration)[#ENUM] = ""; \
-    }\
+    } else {                                                               \
+      (*configuration)[#ENUM] = "";                                        \
+    }                                                                      \
   }
 #define REMOVED_QUERY_OPT_FN(NAME, ENUM) (*configuration)[#ENUM] = "";
   QUERY_OPTS_TABLE
@@ -134,13 +119,13 @@ void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
 static void ResetQueryOption(const int option, TQueryOptions* query_options) {
   const static TQueryOptions defaults;
   switch (option) {
-#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
-    case TImpalaQueryOptions::ENUM:\
-      query_options->__isset.NAME = defaults.__isset.NAME;\
-      query_options->NAME = defaults.NAME;\
-      break;
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL)                  \
+  case TImpalaQueryOptions::ENUM:                        \
+    query_options->__isset.NAME = defaults.__isset.NAME; \
+    query_options->NAME = defaults.NAME;                 \
+    break;
 #define REMOVED_QUERY_OPT_FN(NAME, ENUM)
-  QUERY_OPTS_TABLE
+    QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
 #undef REMOVED_QUERY_OPT_FN
   }
@@ -153,20 +138,19 @@ static TQueryOptions DefaultQueryOptions() {
   return defaults;
 }
 
-inline bool operator!=(const TCompressionCodec& a,
-  const TCompressionCodec& b) {
+inline bool operator!=(const TCompressionCodec& a, const TCompressionCodec& b) {
   return (a.codec != b.codec || a.compression_level != b.compression_level);
 }
 
 string impala::DebugQueryOptions(const TQueryOptions& query_options) {
   const static TQueryOptions defaults = DefaultQueryOptions();
   int i = 0;
-  stringstream ss;
-#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
-  if (query_options.__isset.NAME &&\
-      (!defaults.__isset.NAME || query_options.NAME != defaults.NAME)) {\
-    if (i++ > 0) ss << ",";\
-    ss << #ENUM << "=" << query_options.NAME;\
+  std::stringstream ss;
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL)                                     \
+  if (query_options.__isset.NAME                                            \
+      && (!defaults.__isset.NAME || query_options.NAME != defaults.NAME)) { \
+    if (i++ > 0) ss << ",";                                                 \
+    ss << #ENUM << "=" << query_options.NAME;                               \
   }
 #define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
@@ -178,7 +162,7 @@ string impala::DebugQueryOptions(const TQueryOptions& query_options) {
 // Returns the TImpalaQueryOptions enum for the given "key". Input is case insensitive.
 // Return -1 if the input is an invalid option.
 static int GetQueryOptionForKey(const string& key) {
-  map<int, const char*>::const_iterator itr =
+  std::map<int, const char*>::const_iterator itr =
       _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
   for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
     if (iequals(key, (*itr).second)) {
@@ -192,8 +176,8 @@ static int GetQueryOptionForKey(const string& key) {
 static bool IsRemovedQueryOption(const string& key) {
 #define QUERY_OPT_FN(NAME, ENUM, LEVEL)
 #define REMOVED_QUERY_OPT_FN(NAME, ENUM) \
-  if (iequals(key, #NAME)) { \
-    return true; \
+  if (iequals(key, #NAME)) {             \
+    return true;                         \
   }
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
@@ -211,10 +195,14 @@ static bool IsTrue(const string& value) {
 // configuration.
 Status impala::SetQueryOption(const string& key, const string& value,
     TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) {
-  int option = GetQueryOptionForKey(key);
-  if (option < 0) {
+  int option_int = GetQueryOptionForKey(key);
+  if (option_int < 0) {
     return Status(Substitute("Invalid query option: $0", key));
-  } else if (value == "") {
+  }
+
+  TImpalaQueryOptions::type option = static_cast<TImpalaQueryOptions::type>(option_int);
+
+  if (value.empty()) {
     ResetQueryOption(option, query_options);
     if (set_query_options_mask != nullptr) {
       DCHECK_LT(option, set_query_options_mask->size());
@@ -222,48 +210,55 @@ Status impala::SetQueryOption(const string& key, const string& value,
     }
   } else {
     switch (option) {
-      case TImpalaQueryOptions::ABORT_ON_ERROR:
+      case TImpalaQueryOptions::ABORT_ON_ERROR: {
         query_options->__set_abort_on_error(IsTrue(value));
         break;
-      case TImpalaQueryOptions::MAX_ERRORS:
-        query_options->__set_max_errors(atoi(value.c_str()));
+      }
+      case TImpalaQueryOptions::MAX_ERRORS: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        query_options->__set_max_errors(int32_t_val);
         break;
-      case TImpalaQueryOptions::DISABLE_CODEGEN:
+      }
+      case TImpalaQueryOptions::DISABLE_CODEGEN: {
         query_options->__set_disable_codegen(IsTrue(value));
         break;
+      };
       case TImpalaQueryOptions::BATCH_SIZE: {
-        StringParser::ParseResult status;
-        int val = StringParser::StringToInt<int>(value.c_str(),
-            static_cast<int>(value.size()), &status);
-        if (status != StringParser::PARSE_SUCCESS || val < 0 || val > 65536) {
-          return Status(Substitute("Invalid batch size '$0'. Valid sizes are in"
-              "[0, 65536]", value));
-        }
-        query_options->__set_batch_size(val);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<int32_t>(
+            option, value, 0, 65536, &int32_t_val));
+        query_options->__set_batch_size(int32_t_val);
         break;
-      }
+      };
       case TImpalaQueryOptions::MEM_LIMIT: {
-        // Parse the mem limit spec and validate it.
-        int64_t bytes_limit;
-        RETURN_IF_ERROR(ParseMemValue(value, "query memory limit", &bytes_limit));
-        query_options->__set_mem_limit(bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_mem_limit(mem_spec_val.value);
         break;
-      }
-      case TImpalaQueryOptions::NUM_NODES:
-        query_options->__set_num_nodes(atoi(value.c_str()));
+      };
+      case TImpalaQueryOptions::NUM_NODES: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        query_options->__set_num_nodes(int32_t_val);
         break;
+      };
       case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH: {
-        int64_t scan_length = 0;
-        RETURN_IF_ERROR(ParseMemValue(value, "scan range length", &scan_length));
-        query_options->__set_max_scan_range_length(scan_length);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_max_scan_range_length(mem_spec_val.value);
         break;
       }
-      case TImpalaQueryOptions::NUM_SCANNER_THREADS:
-        query_options->__set_num_scanner_threads(atoi(value.c_str()));
+      case TImpalaQueryOptions::NUM_SCANNER_THREADS: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        query_options->__set_num_scanner_threads(int32_t_val);
         break;
-      case TImpalaQueryOptions::DEBUG_ACTION:
-        query_options->__set_debug_action(value.c_str());
+      };
+      case TImpalaQueryOptions::DEBUG_ACTION: {
+        query_options->__set_debug_action(value);
         break;
+      };
       case TImpalaQueryOptions::COMPRESSION_CODEC: {
         THdfsCompression::type enum_type;
         int compression_level;
@@ -277,58 +272,53 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_compression_codec(compression_codec);
         break;
       }
-      case TImpalaQueryOptions::HBASE_CACHING:
-        query_options->__set_hbase_caching(atoi(value.c_str()));
+      case TImpalaQueryOptions::HBASE_CACHING: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        query_options->__set_hbase_caching(int32_t_val);
         break;
-      case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
+      };
+      case TImpalaQueryOptions::HBASE_CACHE_BLOCKS: {
         query_options->__set_hbase_cache_blocks(IsTrue(value));
         break;
+      };
       case TImpalaQueryOptions::PARQUET_FILE_SIZE: {
-        int64_t file_size;
-        RETURN_IF_ERROR(ParseMemValue(value, "parquet file size", &file_size));
-        if (file_size > numeric_limits<int32_t>::max()) {
-          // Do not allow values greater than or equal to 2GB since hdfsOpenFile() from
-          // the HDFS API gets an int32 blocksize parameter (see HDFS-8949).
-          stringstream ss;
-          ss << "The PARQUET_FILE_SIZE query option must be less than 2GB.";
-          return Status(ss.str());
-        } else {
-          query_options->__set_parquet_file_size(file_size);
-        }
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<MemSpec>(
+            option, value, {0}, {numeric_limits<int32_t>::max()}, &mem_spec_val));
+        query_options->__set_parquet_file_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::EXPLAIN_LEVEL: {
         TExplainLevel::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "explain level",
-            _TExplainLevel_VALUES_TO_NAMES, &enum_type));
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "explain level", _TExplainLevel_VALUES_TO_NAMES, &enum_type));
         query_options->__set_explain_level(enum_type);
         break;
       }
-      case TImpalaQueryOptions::SYNC_DDL:
+      case TImpalaQueryOptions::SYNC_DDL: {
         query_options->__set_sync_ddl(IsTrue(value));
         break;
-      case TImpalaQueryOptions::REQUEST_POOL:
+      };
+      case TImpalaQueryOptions::REQUEST_POOL: {
         query_options->__set_request_pool(value);
         break;
-      case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
+      };
+      case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN: {
         query_options->__set_disable_outermost_topn(IsTrue(value));
         break;
+      };
       case TImpalaQueryOptions::QUERY_TIMEOUT_S: {
-        StringParser::ParseResult result;
-        const int32_t timeout_s =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || timeout_s < 0) {
-          return Status(
-              Substitute("Invalid query timeout: '$0'. "
-                         "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_query_timeout_s(timeout_s);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_query_timeout_s(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::BUFFER_POOL_LIMIT: {
-        int64_t mem;
-        RETURN_IF_ERROR(ParseMemValue(value, "buffer pool limit", &mem));
-        query_options->__set_buffer_pool_limit(mem);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_buffer_pool_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::APPX_COUNT_DISTINCT: {
@@ -339,33 +329,37 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_disable_unsafe_spills(IsTrue(value));
         break;
       }
-      case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
-        query_options->__set_exec_single_node_rows_threshold(atoi(value.c_str()));
+      case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        query_options->__set_exec_single_node_rows_threshold(int32_t_val);
         break;
-      case TImpalaQueryOptions::OPTIMIZE_PARTITION_KEY_SCANS:
+      };
+      case TImpalaQueryOptions::OPTIMIZE_PARTITION_KEY_SCANS: {
         query_options->__set_optimize_partition_key_scans(IsTrue(value));
         break;
-      case TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT:
+      };
+      case TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT: {
         query_options->__set_optimize_simple_limit(IsTrue(value));
         break;
+      };
       case TImpalaQueryOptions::REPLICA_PREFERENCE: {
-        map<int, const char *> valid_enums_values = {
-            {0, "CACHE_LOCAL"},
-            {2, "DISK_LOCAL"},
-            {4, "REMOTE"}
-        };
+        std::map<int, const char*> valid_enums_values = {
+            {0, "CACHE_LOCAL"}, {2, "DISK_LOCAL"}, {4, "REMOTE"}};
         TReplicaPreference::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "replica memory distance preference",
-            valid_enums_values, &enum_type));
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "replica memory distance preference", valid_enums_values, &enum_type));
         query_options->__set_replica_preference(enum_type);
         break;
       }
-      case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA:
+      case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA: {
         query_options->__set_schedule_random_replica(IsTrue(value));
         break;
-      case TImpalaQueryOptions::DISABLE_STREAMING_PREAGGREGATIONS:
+      };
+      case TImpalaQueryOptions::DISABLE_STREAMING_PREAGGREGATIONS: {
         query_options->__set_disable_streaming_preaggregations(IsTrue(value));
         break;
+      };
       case TImpalaQueryOptions::RUNTIME_FILTER_MODE: {
         TRuntimeFilterMode::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "runtime filter mode",
@@ -373,84 +367,70 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_runtime_filter_mode(enum_type);
         break;
       }
-      case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE:
-      case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE:
-      case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
-        int64_t size;
-        RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size));
-        if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE ||
-            size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
-          return Status(Substitute("$0 is not a valid Bloom filter size for $1. "
-                  "Valid sizes are in [$2, $3].", value, PrintValue(
-                      static_cast<TImpalaQueryOptions::type>(option)),
-                  RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-                  RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
-        }
-        if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE
-            && size < FLAGS_min_buffer_size
+      case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        RETURN_IF_ERROR(QueryOptionValidator<MemSpec>::InclusiveRange(option,
+            mem_spec_val, {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE},
+            {RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}));
+        if (mem_spec_val.value < FLAGS_min_buffer_size
             // last condition is to unblock the highly improbable case where the
             // min_buffer_size is greater than RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE.
             && FLAGS_min_buffer_size <= RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
           return Status(Substitute("$0 should not be less than $1 which is the minimum "
-              "buffer size that can be allocated by the buffer pool",
-              PrintValue(static_cast<TImpalaQueryOptions::type>(option)),
-              FLAGS_min_buffer_size));
-        }
-        if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
-          query_options->__set_runtime_bloom_filter_size(size);
-        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {
-          query_options->__set_runtime_filter_min_size(size);
-        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) {
-          query_options->__set_runtime_filter_max_size(size);
+                                   "buffer size that can be allocated by the buffer pool",
+              PrintValue(option), FLAGS_min_buffer_size));
         }
+        query_options->__set_runtime_filter_max_size(mem_spec_val.value);
+        break;
+      }
+      case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<MemSpec>(option,
+            value, {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE},
+            {RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}, &mem_spec_val));
+        query_options->__set_runtime_filter_min_size(mem_spec_val.value);
+        break;
+      }
+      case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<MemSpec>(option,
+            value, {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE},
+            {RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}, &mem_spec_val));
+        query_options->__set_runtime_bloom_filter_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: {
-        StringParser::ParseResult result;
-        const int32_t time_ms =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || time_ms < 0) {
-          return Status(
-              Substitute("$0 is not a valid wait time. Valid sizes are in [0, $1].",
-                  value, numeric_limits<int32_t>::max()));
-        }
-        query_options->__set_runtime_filter_wait_time_ms(time_ms);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, 0, &int32_t_val));
+        query_options->__set_runtime_filter_wait_time_ms(int32_t_val);
         break;
       }
-      case TImpalaQueryOptions::DISABLE_ROW_RUNTIME_FILTERING:
+      case TImpalaQueryOptions::DISABLE_ROW_RUNTIME_FILTERING: {
         query_options->__set_disable_row_runtime_filtering(IsTrue(value));
         break;
-      case TImpalaQueryOptions::MINMAX_FILTERING_LEVEL:
+      };
+      case TImpalaQueryOptions::MINMAX_FILTERING_LEVEL: {
         // Parse the enabled runtime filter types and validate it.
         TMinmaxFilteringLevel::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "minmax filter level",
             _TMinmaxFilteringLevel_VALUES_TO_NAMES, &enum_type));
         query_options->__set_minmax_filtering_level(enum_type);
         break;
+      };
       case TImpalaQueryOptions::MINMAX_FILTER_THRESHOLD: {
-        StringParser::ParseResult status;
-        double val = StringParser::StringToFloat<double>(
-            value.c_str(), static_cast<int>(value.size()), &status);
-        if (status != StringParser::PARSE_SUCCESS || val < 0.0 || val > 1.0) {
-          return Status(
-              Substitute("Invalid minmax filter threshold '$0'. Valid sizes are in"
-                         "[0.0, 1.0]",
-                  value));
-        }
-        query_options->__set_minmax_filter_threshold(val);
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<double>(
+            option, value, 0.0, 1.0, &double_val));
+        query_options->__set_minmax_filter_threshold(double_val);
         break;
       }
       case TImpalaQueryOptions::MAX_NUM_RUNTIME_FILTERS: {
-        StringParser::ParseResult status;
-        int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status);
-        if (status != StringParser::PARSE_SUCCESS) {
-          return Status(Substitute("Invalid number of runtime filters: '$0'.", value));
-        }
-        if (val < 0) {
-          return Status(Substitute("Invalid number of runtime filters: '$0'. "
-              "Only positive values are allowed.", val));
-        }
-        query_options->__set_max_num_runtime_filters(val);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_max_num_runtime_filters(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::PARQUET_ANNOTATE_STRINGS_UTF8: {
@@ -472,15 +452,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::MT_DOP: {
-        StringParser::ParseResult result;
-        const int32_t dop =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 64) {
-          return Status(
-              Substitute("$0 is not valid for mt_dop. Valid values are in "
-                "[0, 64].", value));
-        }
-        query_options->__set_mt_dop(dop);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<int32_t>(
+            option, value, 0, 64, &int32_t_val));
+        query_options->__set_mt_dop(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: {
@@ -489,8 +464,8 @@ Status impala::SetQueryOption(const string& key, const string& value,
       }
       case TImpalaQueryOptions::PREFETCH_MODE: {
         TPrefetchMode::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "prefetch mode",
-            _TPrefetchMode_VALUES_TO_NAMES, &enum_type));
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "prefetch mode", _TPrefetchMode_VALUES_TO_NAMES, &enum_type));
         query_options->__set_prefetch_mode(enum_type);
         break;
       }
@@ -503,10 +478,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         if (iequals(value, "-1")) {
           query_options->__set_scratch_limit(-1);
         } else {
-          int64_t bytes_limit;
-          RETURN_IF_ERROR(ParseMemValue(value, "Scratch space memory limit",
-              &bytes_limit));
-          query_options->__set_scratch_limit(bytes_limit);
+          MemSpec mem_spec_val{};
+          RETURN_IF_ERROR(
+              QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+          query_options->__set_scratch_limit(mem_spec_val.value);
         }
         break;
       }
@@ -533,7 +508,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE: {
         TParquetBloomFilterWrite::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "Parquet Bloom filter write",
-           _TParquetBloomFilterWrite_VALUES_TO_NAMES, &enum_type));
+            _TParquetBloomFilterWrite_VALUES_TO_NAMES, &enum_type));
         query_options->__set_parquet_bloom_filter_write(enum_type);
         break;
       }
@@ -553,83 +528,49 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD: {
-        StringParser::ParseResult status;
-        int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status);
-        if (status != StringParser::PARSE_SUCCESS) {
-          return Status(Substitute("Invalid threshold: '$0'.", value));
-        }
-        if (val < 0) {
-          return Status(Substitute(
-              "Invalid threshold: '$0'. Only positive values are allowed.", val));
-        }
-        query_options->__set_disable_codegen_rows_threshold(val);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_disable_codegen_rows_threshold(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::DEFAULT_SPILLABLE_BUFFER_SIZE: {
-        int64_t buffer_size_bytes;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "Default spillable buffer size", &buffer_size_bytes));
-        if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
-          return Status(
-              Substitute("Default spillable buffer size must be a power of two: $0",
-                  buffer_size_bytes));
-        }
-        if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) {
-          return Status(Substitute(
-              "Default spillable buffer size must be less than or equal to: $0",
-              SPILLABLE_BUFFER_LIMIT));
-        }
-        query_options->__set_default_spillable_buffer_size(buffer_size_bytes);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        RETURN_IF_ERROR(QueryOptionValidator<MemSpec>::InclusiveRange(
+            option, mem_spec_val, {0}, {SPILLABLE_BUFFER_LIMIT}));
+        RETURN_IF_ERROR(QueryOptionValidator<MemSpec>::PowerOf2(option, mem_spec_val));
+        query_options->__set_default_spillable_buffer_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE: {
-        int64_t buffer_size_bytes;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "Minimum spillable buffer size", &buffer_size_bytes));
-        if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
-          return Status(
-              Substitute("Minimum spillable buffer size must be a power of two: $0",
-                  buffer_size_bytes));
-        }
-        if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) {
-          return Status(Substitute(
-              "Minimum spillable buffer size must be less than or equal to: $0",
-              SPILLABLE_BUFFER_LIMIT));
-        }
-        query_options->__set_min_spillable_buffer_size(buffer_size_bytes);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        RETURN_IF_ERROR(QueryOptionValidator<MemSpec>::InclusiveRange(
+            option, mem_spec_val, {0}, {SPILLABLE_BUFFER_LIMIT}));
+        RETURN_IF_ERROR(QueryOptionValidator<MemSpec>::PowerOf2(option, mem_spec_val));
+        query_options->__set_min_spillable_buffer_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MAX_ROW_SIZE: {
-        int64_t max_row_size_bytes;
-        RETURN_IF_ERROR(ParseMemValue(value, "Max row size", &max_row_size_bytes));
-        if (max_row_size_bytes <= 0 || max_row_size_bytes > ROW_SIZE_LIMIT) {
-          return Status(
-              Substitute("Invalid max row size of $0. Valid sizes are in [$1, $2]", value,
-                  1, ROW_SIZE_LIMIT));
-        }
-        query_options->__set_max_row_size(max_row_size_bytes);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<MemSpec>(
+            option, value, {1}, {ROW_SIZE_LIMIT}, &mem_spec_val));
+        query_options->__set_max_row_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::IDLE_SESSION_TIMEOUT: {
-        StringParser::ParseResult result;
-        const int32_t requested_timeout =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) {
-          return Status(
-              Substitute("Invalid idle session timeout: '$0'. "
-                         "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_idle_session_timeout(requested_timeout);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_idle_session_timeout(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::COMPUTE_STATS_MIN_SAMPLE_SIZE: {
-        int64_t min_sample_size;
-        RETURN_IF_ERROR(ParseMemValue(value, "Min sample size", &min_sample_size));
-        if (min_sample_size < 0) {
-          return Status(
-              Substitute("Min sample size must be greater or equal to zero: $0", value));
-        }
-        query_options->__set_compute_stats_min_sample_size(min_sample_size);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<MemSpec>(
+            option, value, &mem_spec_val));
+        query_options->__set_compute_stats_min_sample_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::COMPUTE_COLUMN_MINMAX_STATS: {
@@ -641,15 +582,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::EXEC_TIME_LIMIT_S: {
-        StringParser::ParseResult result;
-        const int32_t time_limit =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || time_limit < 0) {
-          return Status(
-              Substitute("Invalid query time limit: '$0'. "
-                         "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_exec_time_limit_s(time_limit);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_exec_time_limit_s(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS: {
@@ -657,36 +593,29 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION: {
-        int64_t bytes_limit;
-        RETURN_IF_ERROR(ParseMemValue(
-            value, "max memory estimate for admission", &bytes_limit));
-        query_options->__set_max_mem_estimate_for_admission(bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_max_mem_estimate_for_admission(mem_spec_val.value);
+        break;
+      }
+      case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, -1, &int32_t_val));
+        query_options->__set_thread_reservation_limit(int32_t_val);
         break;
       }
-      case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT:
       case TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT: {
-        // Parsing logic is identical for these two options.
-        StringParser::ParseResult status;
-        int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status);
-        if (status != StringParser::PARSE_SUCCESS) {
-          return Status(Substitute("Invalid thread count: '$0'.", value));
-        }
-        if (val < -1) {
-          return Status(Substitute("Invalid thread count: '$0'. "
-              "Only -1 and non-negative values are allowed.", val));
-        }
-        if (option == TImpalaQueryOptions::THREAD_RESERVATION_LIMIT) {
-          query_options->__set_thread_reservation_limit(val);
-        } else {
-          DCHECK_EQ(option, TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT);
-          query_options->__set_thread_reservation_aggregate_limit(val);
-        }
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, -1, &int32_t_val));
+        query_options->__set_thread_reservation_aggregate_limit(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::KUDU_READ_MODE: {
         TKuduReadMode::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "Kudu read mode",
-            _TKuduReadMode_VALUES_TO_NAMES, &enum_type));
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "Kudu read mode", _TKuduReadMode_VALUES_TO_NAMES, &enum_type));
         query_options->__set_kudu_read_mode(enum_type);
         break;
       }
@@ -707,27 +636,22 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::SCAN_BYTES_LIMIT: {
-        int64_t bytes_limit;
-        RETURN_IF_ERROR(ParseMemValue(value, "query scan bytes limit", &bytes_limit));
-        query_options->__set_scan_bytes_limit(bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_scan_bytes_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::CPU_LIMIT_S: {
-        StringParser::ParseResult result;
-        const int64_t cpu_limit_s =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || cpu_limit_s < 0) {
-          return Status(
-              Substitute("Invalid CPU limit: '$0'. "
-                         "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_cpu_limit_s(cpu_limit_s);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int64_t>(
+            option, value, &int64_t_val));
+        query_options->__set_cpu_limit_s(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::TOPN_BYTES_LIMIT: {
-        int64_t topn_bytes_limit;
-        RETURN_IF_ERROR(ParseMemValue(value, "topn bytes limit", &topn_bytes_limit));
-        query_options->__set_topn_bytes_limit(topn_bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_topn_bytes_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::CLIENT_IDENTIFIER: {
@@ -735,15 +659,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::RESOURCE_TRACE_RATIO: {
-        StringParser::ParseResult result;
-        const double val =
-            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1) {
-          return Status(Substitute("Invalid resource trace ratio: '$0'. "
-                                   "Only values from 0 to 1 are allowed.",
-              value));
-        }
-        query_options->__set_resource_trace_ratio(val);
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<double>(
+            option, value, 0.0, 1.0, &double_val));
+        query_options->__set_resource_trace_ratio(double_val);
         break;
       }
       case TImpalaQueryOptions::PLANNER_TESTCASE_MODE: {
@@ -751,34 +670,23 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES: {
-        StringParser::ParseResult result;
-        const int64_t num_remote_executor_candidates =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS ||
-            num_remote_executor_candidates < 0 || num_remote_executor_candidates > 16) {
-          return Status(
-              Substitute("$0 is not valid for num_remote_executor_candidates. "
-                         "Valid values are in [0, 16].", value));
-        }
-        query_options->__set_num_remote_executor_candidates(
-            num_remote_executor_candidates);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<int32_t>(
+            option, value, 0, 16, &int32_t_val));
+        query_options->__set_num_remote_executor_candidates(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT: {
-        StringParser::ParseResult result;
-        const int64_t num_rows_produced_limit =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || num_rows_produced_limit < 0) {
-          return Status(Substitute("Invalid rows returned limit: '$0'. "
-                                   "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_num_rows_produced_limit(num_rows_produced_limit);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int64_t>(
+            option, value, &int64_t_val));
+        query_options->__set_num_rows_produced_limit(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::DEFAULT_FILE_FORMAT: {
         THdfsFileFormat::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "default file format",
-            _THdfsFileFormat_VALUES_TO_NAMES, &enum_type));
+        RETURN_IF_ERROR(GetThriftEnum(
+            value, "default file format", _THdfsFileFormat_VALUES_TO_NAMES, &enum_type));
         query_options->__set_default_file_format(enum_type);
         break;
       }
@@ -798,13 +706,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::PARQUET_PAGE_ROW_COUNT_LIMIT: {
-        StringParser::ParseResult result;
-        const int32_t row_count_limit =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || row_count_limit <= 0) {
-          return Status("Parquet page row count limit must be a positive integer.");
-        }
-        query_options->__set_parquet_page_row_count_limit(row_count_limit);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, 1, &int32_t_val));
+        query_options->__set_parquet_page_row_count_limit(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::DISABLE_HDFS_NUM_ROWS_ESTIMATE: {
@@ -820,19 +725,15 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::MAX_RESULT_SPOOLING_MEM: {
-        int64_t max_result_spooling_mem;
-        RETURN_IF_ERROR(ParseMemValue(value, "max result spooling memory",
-            &max_result_spooling_mem));
-        query_options->__set_max_result_spooling_mem(
-            max_result_spooling_mem);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_max_result_spooling_mem(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MAX_SPILLED_RESULT_SPOOLING_MEM: {
-        int64_t max_spilled_result_spooling_mem;
-        RETURN_IF_ERROR(ParseMemValue(value, "max spilled result spooling memory",
-            &max_spilled_result_spooling_mem));
-        query_options->__set_max_spilled_result_spooling_mem(
-            max_spilled_result_spooling_mem);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_max_spilled_result_spooling_mem(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::DEFAULT_TRANSACTIONAL_TYPE: {
@@ -843,29 +744,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT: {
-        StringParser::ParseResult result;
-        const int32_t statement_expression_limit =
-          StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS ||
-            statement_expression_limit < MIN_STATEMENT_EXPRESSION_LIMIT) {
-          return Status(Substitute("Invalid statement expression limit: $0 "
-              "Valid values are in [$1, $2]", value, MIN_STATEMENT_EXPRESSION_LIMIT,
-              std::numeric_limits<int32_t>::max()));
-        }
-        query_options->__set_statement_expression_limit(statement_expression_limit);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, MIN_STATEMENT_EXPRESSION_LIMIT, &int32_t_val));
+        query_options->__set_statement_expression_limit(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES: {
-        int64_t max_statement_length_bytes;
-        RETURN_IF_ERROR(ParseMemValue(value, "max statement length bytes",
-            &max_statement_length_bytes));
-        if (max_statement_length_bytes < MIN_MAX_STATEMENT_LENGTH_BYTES ||
-            max_statement_length_bytes > std::numeric_limits<int32_t>::max()) {
-          return Status(Substitute("Invalid maximum statement length: $0 "
-              "Valid values are in [$1, $2]", max_statement_length_bytes,
-              MIN_MAX_STATEMENT_LENGTH_BYTES, std::numeric_limits<int32_t>::max()));
-        }
-        query_options->__set_max_statement_length_bytes(max_statement_length_bytes);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<MemSpec>(option,
+            value, {MIN_MAX_STATEMENT_LENGTH_BYTES},
+            {std::numeric_limits<int32_t>::max()}, &mem_spec_val));
+        query_options->__set_max_statement_length_bytes(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::DISABLE_DATA_CACHE: {
@@ -877,15 +767,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS: {
-        StringParser::ParseResult result;
-        const int64_t requested_timeout =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) {
-          return Status(
-              Substitute("Invalid fetch rows timeout: '$0'. "
-                         "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_fetch_rows_timeout_ms(requested_timeout);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int64_t>(
+            option, value, &int64_t_val));
+        query_options->__set_fetch_rows_timeout_ms(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::NOW_STRING: {
@@ -893,38 +778,27 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::PARQUET_OBJECT_STORE_SPLIT_SIZE: {
-        int64_t parquet_object_store_split_size;
-        RETURN_IF_ERROR(ParseMemValue(
-            value, "parquet object store split size", &parquet_object_store_split_size));
         // The MIN_SYNTHETIC_BLOCK_SIZE from HdfsPartition.java. HdfsScanNode.java forces
         // the block size to be greater than or equal to this value, so reject any
         // attempt to set PARQUET_OBJECT_STORE_SPLIT_SIZE to a value lower than
         // MIN_SYNTHETIC_BLOCK_SIZE.
-        int min_synthetic_block_size = 1024 * 1024;
-        if (parquet_object_store_split_size < min_synthetic_block_size) {
-          return Status(Substitute("Invalid parquet object store split size: '$0'. Must "
-                                   "be greater than or equal to '$1'.",
-              value, min_synthetic_block_size));
-        }
-        query_options->__set_parquet_object_store_split_size(
-            parquet_object_store_split_size);
+        constexpr int min_synthetic_block_size = 1024 * 1024;
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<MemSpec>(
+            option, value, {min_synthetic_block_size}, &mem_spec_val));
+        query_options->__set_parquet_object_store_split_size(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MEM_LIMIT_EXECUTORS: {
-        // Parse the mem limit spec and validate it.
-        int64_t bytes_limit;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "query memory limit for executors", &bytes_limit));
-        query_options->__set_mem_limit_executors(bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_mem_limit_executors(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::BROADCAST_BYTES_LIMIT: {
-        // Parse the broadcast_bytes limit and validate it
-        int64_t broadcast_bytes_limit;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "broadcast bytes limit for join operations",
-                &broadcast_bytes_limit));
-        query_options->__set_broadcast_bytes_limit(broadcast_bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_broadcast_bytes_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::RETRY_FAILED_QUERIES: {
@@ -932,38 +806,27 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::PREAGG_BYTES_LIMIT: {
-        // Parse the preaggregation bytes limit and validate it
-        int64_t preagg_bytes_limit;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "preaggregation bytes limit", &preagg_bytes_limit));
-        query_options->__set_preagg_bytes_limit(preagg_bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_preagg_bytes_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MAX_CNF_EXPRS: {
-        StringParser::ParseResult result;
-        const int32_t requested_max_cnf_exprs =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || requested_max_cnf_exprs < -1) {
-          return Status(
-              Substitute("Invalid max cnf exprs : '$0'. "
-                         "Only -1 and non-negative numbers are allowed.", value));
-        }
-        query_options->__set_max_cnf_exprs(requested_max_cnf_exprs);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, -1, &int32_t_val));
+        query_options->__set_max_cnf_exprs(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS: {
-        StringParser::ParseResult result;
-        const int64_t timestamp =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || timestamp < 0) {
-          return Status(Substitute("Invalid Kudu snapshot read timestamp: Only "
-              "non-negative numbers are allowed.", value));
-        }
-        query_options->__set_kudu_snapshot_read_timestamp_micros(timestamp);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int64_t>(
+            option, value, &int64_t_val));
+        query_options->__set_kudu_snapshot_read_timestamp_micros(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::ENABLED_RUNTIME_FILTER_TYPES: {
-        set<TRuntimeFilterType::type> filter_types;
+        std::set<TRuntimeFilterType::type> filter_types;
         if (iequals(value, "all")) {
           for (const auto& kv : _TRuntimeFilterType_VALUES_TO_NAMES) {
             filter_types.insert(static_cast<TRuntimeFilterType::type>(kv.first));
@@ -991,23 +854,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::SORT_RUN_BYTES_LIMIT: {
-        // Parse the sort bytes limit and validate it
-        int64_t sort_run_bytes_limit;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "sort run bytes limit", &sort_run_bytes_limit));
-        query_options->__set_sort_run_bytes_limit(sort_run_bytes_limit);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_sort_run_bytes_limit(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::MAX_FS_WRITERS: {
-        StringParser::ParseResult result;
-        const int32_t max_fs_writers =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || max_fs_writers < 0) {
-          return Status(Substitute("$0 is not valid for MAX_FS_WRITERS. Only "
-                                   "non-negative numbers are allowed.",
-              value));
-        }
-        query_options->__set_max_fs_writers(max_fs_writers);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_max_fs_writers(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::REFRESH_UPDATED_HMS_PARTITIONS: {
@@ -1019,14 +875,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::RUNTIME_FILTER_ERROR_RATE: {
-        StringParser::ParseResult result;
-        const double val =
-            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || val <= 0 || val >= 1) {
-          return Status(Substitute("Invalid runtime filter error rate: "
-                "'$0'. Only values between 0 to 1 (exclusive) are allowed.", value));
-        }
-        query_options->__set_runtime_filter_error_rate(val);
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckExclusiveRange<double>(
+            option, value, 0.0, 1.0, &double_val));
+        query_options->__set_runtime_filter_error_rate(double_val);
         break;
       }
       case TImpalaQueryOptions::USE_LOCAL_TZ_FOR_UNIX_TIMESTAMP_CONVERSIONS: {
@@ -1038,27 +890,19 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION: {
-         query_options->__set_enable_outer_join_to_inner_transformation(IsTrue(value));
-         break;
+        query_options->__set_enable_outer_join_to_inner_transformation(IsTrue(value));
+        break;
       }
       case TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH: {
-        int64_t scan_length = 0;
-        RETURN_IF_ERROR(
-            ParseMemValue(value, "targeted kudu scan range length", &scan_length));
-        query_options->__set_targeted_kudu_scan_range_length(scan_length);
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_targeted_kudu_scan_range_length(mem_spec_val.value);
         break;
       }
       case TImpalaQueryOptions::REPORT_SKEW_LIMIT: {
-        StringParser::ParseResult result;
-        const double skew_threshold =
-            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS) {
-          return Status(
-              Substitute("$0 is not valid for REPORT_SKEW_LIMIT. Only numeric "
-                         "values (such as 1.0) are allowed.",
-                  value));
-        }
-        query_options->__set_report_skew_limit(skew_threshold);
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<double>(option, value, &double_val));
+        query_options->__set_report_skew_limit(double_val);
         break;
       }
       case TImpalaQueryOptions::USE_DOP_FOR_COSTING: {
@@ -1066,26 +910,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR: {
-        StringParser::ParseResult result;
-        const double val =
-            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1000) {
-          return Status(Substitute("Invalid broadcast to partition factor '$0'. "
-                                   "Only values from 0 to 1000 are allowed.",
-              value));
-        }
-        query_options->__set_broadcast_to_partition_factor(val);
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<double>(
+            option, value, 0.0, 1000.0, &double_val));
+        query_options->__set_broadcast_to_partition_factor(double_val);
         break;
       }
       case TImpalaQueryOptions::JOIN_ROWS_PRODUCED_LIMIT: {
-        StringParser::ParseResult result;
-        const int64_t join_rows_produced_limit =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || join_rows_produced_limit < 0) {
-          return Status(Substitute("Invalid join rows produced limit: '$0'. "
-                                   "Only non-negative numbers are allowed.", value));
-        }
-        query_options->__set_join_rows_produced_limit(join_rows_produced_limit);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int64_t>(
+            option, value, &int64_t_val));
+        query_options->__set_join_rows_produced_limit(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::UTF8_MODE: {
@@ -1093,30 +928,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::ANALYTIC_RANK_PUSHDOWN_THRESHOLD: {
-        StringParser::ParseResult status;
-        int64_t val =
-            StringParser::StringToInt<int64_t>(value.c_str(), value.size(), &status);
-        if (status != StringParser::PARSE_SUCCESS) {
-          return Status(Substitute("Invalid threshold: '$0'.", value));
-        }
-        if (val < -1) {
-          return Status(Substitute("Invalid threshold: '$0'. Only non-negative values "
-                "and -1 are allowed.", val));
-        }
-        query_options->__set_analytic_rank_pushdown_threshold(val);
+        int64_t int64_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int64_t>(
+            option, value, -1, &int64_t_val));
+        query_options->__set_analytic_rank_pushdown_threshold(int64_t_val);
         break;
       }
       case TImpalaQueryOptions::DEFAULT_NDV_SCALE: {
-        StringParser::ParseResult result;
-        const int32_t scale =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (value == nullptr || result != StringParser::PARSE_SUCCESS || scale < 1 ||
-            scale > 10) {
-          return Status(
-              Substitute("Invalid NDV scale: '$0'. "
-                         "Only integer value in [1, 10] is allowed.", value));
-        }
-        query_options->__set_default_ndv_scale(scale);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<int32_t>(
+            option, value, 1, 10, &int32_t_val));
+        query_options->__set_default_ndv_scale(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::KUDU_REPLICA_SELECTION: {
@@ -1155,27 +977,19 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::PARQUET_LATE_MATERIALIZATION_THRESHOLD: {
-        StringParser::ParseResult result;
-        const int32_t threshold =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (value == nullptr || result != StringParser::PARSE_SUCCESS || threshold < -1 ||
-            threshold == 0) {
-          return Status(Substitute("Invalid parquet late materialization threshold: "
-              "'$0'. Use -1 to disable the feature, and values > 0 to specify the "
-              "minimum skip length.", value));
-        }
-        query_options->__set_parquet_late_materialization_threshold(threshold);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, &int32_t_val));
+        RETURN_IF_ERROR(
+            QueryOptionValidator<int32_t>::InclusiveLowerBound(option, int32_t_val, -1));
+        RETURN_IF_ERROR(QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 0));
+        query_options->__set_parquet_late_materialization_threshold(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT: {
-        StringParser::ParseResult result;
-        const int32_t limit =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (value == nullptr || result != StringParser::PARSE_SUCCESS || limit < 0) {
-          return Status(Substitute("Invalid parquet parquet dictionary runtime filter "
-              "entry limit '$0'. Only integer value 0 and above is allowed.", value));
-        }
-        query_options->__set_parquet_dictionary_runtime_filter_entry_limit(limit);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_parquet_dictionary_runtime_filter_entry_limit(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION: {
@@ -1191,25 +1005,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT: {
-        StringParser::ParseResult result;
-        const int32_t limit =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (value == nullptr || result != StringParser::PARSE_SUCCESS || limit < 0) {
-          return Status(Substitute("Invalid runtime in-list filter entry limit '$0'. "
-              "Only integer value 0 and above is allowed.", value));
-        }
-        query_options->__set_runtime_in_list_filter_entry_limit(limit);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_runtime_in_list_filter_entry_limit(int32_t_val);
+        break;
       }
       case TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S: {
-        StringParser::ParseResult result;
-        const int32_t lock_max_wait =
-            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (value == nullptr || result != StringParser::PARSE_SUCCESS ||
-            lock_max_wait < 0) {
-          return Status(Substitute("Invalid value for LOCK_MAX_WAIT_TIME_S: '$0'. "
-              "Only integer value 0 and above is allowed.", value));
-        }
-        query_options->__set_lock_max_wait_time_s(lock_max_wait);
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
+            option, value, &int32_t_val));
+        query_options->__set_lock_max_wait_time_s(int32_t_val);
         break;
       }
       case TImpalaQueryOptions::ENABLE_REPLAN: {
@@ -1242,7 +1048,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
         DCHECK(false);
         break;
     }
-    if (set_query_options_mask != NULL) {
+    if (set_query_options_mask != nullptr) {
       DCHECK_LT(option, set_query_options_mask->size());
       set_query_options_mask->set(option);
     }
@@ -1258,7 +1064,7 @@ Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_opt
   // Construct an error status which is used to aggregate errors encountered during
   // parsing. It is only returned if the number of error details is greater than 0.
   Status errorStatus = Status::Expected("Errors parsing query options");
-  for (string& kv_string: kv_pairs) {
+  for (string& kv_string : kv_pairs) {
     trim(kv_string);
     if (kv_string.length() == 0) continue;
     vector<string> key_value;
@@ -1268,8 +1074,8 @@ Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_opt
           Status(Substitute("Invalid configuration option '$0'.", kv_string)));
       continue;
     }
-    errorStatus.MergeStatus(SetQueryOption(key_value[0], key_value[1], query_options,
-        set_query_options_mask));
+    errorStatus.MergeStatus(SetQueryOption(
+        key_value[0], key_value[1], query_options, set_query_options_mask));
   }
   if (errorStatus.msg().details().size() > 0) return errorStatus;
   return Status::OK();
@@ -1293,18 +1099,13 @@ Status impala::ValidateQueryOptions(TQueryOptions* query_options) {
   return Status::OK();
 }
 
-void impala::PopulateQueryOptionLevels(QueryOptionLevels* query_option_levels)
-{
-#define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
-  {\
-    (*query_option_levels)[#ENUM] = LEVEL;\
-  }
-#define REMOVED_QUERY_OPT_FN(NAME, ENUM)\
-  {\
-    (*query_option_levels)[#ENUM] = TQueryOptionLevel::REMOVED;\
-  }
-  QUERY_OPTS_TABLE
-  QUERY_OPT_FN(support_start_over, SUPPORT_START_OVER, TQueryOptionLevel::ADVANCED)
+void impala::PopulateQueryOptionLevels(QueryOptionLevels* query_option_levels){
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL) \
+  { (*query_option_levels)[#ENUM] = LEVEL; }
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM) \
+  { (*query_option_levels)[#ENUM] = TQueryOptionLevel::REMOVED; }
+    QUERY_OPTS_TABLE QUERY_OPT_FN(
+        support_start_over, SUPPORT_START_OVER, TQueryOptionLevel::ADVANCED)
 #undef QUERY_OPT_FN
 #undef REMOVED_QUERY_OPT_FN
 }
diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h
index 907349e18..2ef4f5135 100644
--- a/be/src/util/parse-util.h
+++ b/be/src/util/parse-util.h
@@ -29,6 +29,27 @@
 
 namespace impala {
 
+struct MemSpec {
+  int64_t value = {};
+  // MemSpec parsing can't distinguish 0 and infinity/unspecified (-1 parsed as 0)
+  // values, for printing ranges we have to make 0 available
+  bool could_be_inf_or_unspecified = false;
+
+  bool operator<(const MemSpec& other) const { return this->value < other.value; }
+  bool operator<=(const MemSpec& other) const { return this->value <= other.value; }
+  bool operator>(const MemSpec& other) const { return this->value > other.value; }
+  bool operator>=(const MemSpec& other) const { return this->value >= other.value; }
+
+  friend std::ostream& operator<<(std::ostream& os, const MemSpec& mem_spec) {
+    if (mem_spec.could_be_inf_or_unspecified && mem_spec.value == 0) {
+      os << "Inf/Not specified";
+    } else {
+      os << mem_spec.value;
+    }
+    return os;
+  }
+};
+
 /// Utility class for parsing information from strings.
 class ParseUtil {
  public:
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 23e08a8f5..3bbe83672 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -477,6 +477,8 @@ error_codes = (
   ("JWT_VERIFY_FAILED", 154, "Error verifying JWT Token: $0."),
 
   ("PARQUET_ROWS_SKIPPING", 155, "Couldn't skip rows in column '$0' in file '$1'."),
+
+  ("QUERY_OPTION_PARSE_FAILED", 156, "Failed to parse query option '$0': $1")
 )
 
 import sys
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index f3a49aaeb..a555a6b04 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -87,7 +87,7 @@ STRING, STRING, STRING
 ---- QUERY
 set parquet_file_size='2g'
 ---- CATCH
-The PARQUET_FILE_SIZE query option must be less than 2GB.
+Invalid value for query option PARQUET_FILE_SIZE: 2147483648 is not in range [0, 2147483647]
 ====
 ---- QUERY
 set foo=bar
@@ -266,12 +266,12 @@ explain select count(distinct double_col) from functional.alltypesagg;
 ---- QUERY
 set max_row_size=-1;
 ---- CATCH
-Invalid max row size of -1. Valid sizes are in [1, 1099511627776]
+Invalid value for query option MAX_ROW_SIZE: Inf/Not specified is not in range [1, 1099511627776]
 ====
 ---- QUERY
 set max_row_size=0;
 ---- CATCH
-Invalid max row size of 0. Valid sizes are in [1, 1099511627776]
+Invalid value for query option MAX_ROW_SIZE: Inf/Not specified is not in range [1, 1099511627776]
 ====
 ---- QUERY
 # Setting some removed query options should be a no-op.
@@ -293,10 +293,10 @@ set PARQUET_LATE_MATERIALIZATION_THRESHOLD=10000;
 ---- QUERY
 set PARQUET_LATE_MATERIALIZATION_THRESHOLD=-2;
 ---- CATCH
-Invalid parquet late materialization threshold: '-2'. Use -1 to disable the feature, and values > 0 to specify the minimum skip length.
+Invalid value for query option PARQUET_LATE_MATERIALIZATION_THRESHOLD: Value must be greater than or equal to -1, actual value: -2
 ====
 ---- QUERY
 set PARQUET_LATE_MATERIALIZATION_THRESHOLD=0;
 ---- CATCH
-Invalid parquet late materialization threshold: '0'. Use -1 to disable the feature, and values > 0 to specify the minimum skip length.
+Invalid value for query option PARQUET_LATE_MATERIALIZATION_THRESHOLD: Value can't be 0
 ====