You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/15 17:08:48 UTC

[impala] branch master updated (d12675a -> 3567a2b)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from d12675a  IMPALA-8072: addendum: don't require fe rebuild for config
     new 9dd8d82  IMPALA-8369: Fixing some core tests in Hive environment
     new 454d85b  IMPALA-8537: Negative values reported for tmp-file-mgr.scratch-space-bytes-used under heavy spilling load
     new 30ea70a  IMPALA-966: Attribute type error to the right expression in an insert statement
     new bda8d95  IMPALA-8369 (part 3): Hive 3: fix test_permanent_udfs.py for Hive 3 support
     new efae8dc  IMPALA-8138: Remove FAULT_INJECTION_RPC_DELAY
     new 3567a2b  IMPALA-8369 (part 4): Hive 3: fixes for functional dataset loading

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |  9 ++-
 be/src/runtime/tmp-file-mgr-test.cc                | 82 +++++++++++++++++++++-
 be/src/runtime/tmp-file-mgr.cc                     |  8 +--
 be/src/service/control-service.cc                  | 15 ++--
 be/src/service/data-stream-service.cc              |  4 +-
 be/src/service/impala-internal-service.cc          |  8 ++-
 be/src/testutil/fault-injection-util.cc            | 16 -----
 be/src/testutil/fault-injection-util.h             | 24 -------
 be/src/util/debug-util.cc                          |  6 +-
 be/src/util/debug-util.h                           | 27 ++++---
 be/src/util/metrics.h                              |  2 +
 .../java/org/apache/impala/analysis/Analyzer.java  | 21 ++++--
 .../org/apache/impala/analysis/InsertStmt.java     | 22 ++++--
 .../org/apache/impala/analysis/ModifyStmt.java     |  2 +-
 .../org/apache/impala/analysis/StatementBase.java  | 17 +++--
 .../java/org/apache/impala/analysis/UnionStmt.java | 12 +++-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  2 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 33 ++++++++-
 .../catalog/CatalogObjectToFromThriftTest.java     |  1 +
 fe/src/test/resources/hive-site.xml.py             | 19 ++++-
 testdata/bin/create-load-data.sh                   |  5 ++
 ...te-mini.sql => load-dependent-tables-hive2.sql} | 28 ++++----
 testdata/bin/load-dependent-tables.sql             | 10 +--
 testdata/bin/load_nested.py                        | 50 +++++++++----
 .../functional/functional_schema_template.sql      |  6 +-
 tests/common/environ.py                            |  2 +
 tests/common/impala_test_suite.py                  |  4 +-
 tests/custom_cluster/test_permanent_udfs.py        | 59 +++++++---------
 tests/custom_cluster/test_rpc_timeout.py           | 28 ++++----
 29 files changed, 336 insertions(+), 186 deletions(-)
 copy testdata/bin/{create-mini.sql => load-dependent-tables-hive2.sql} (63%)


[impala] 05/06: IMPALA-8138: Remove FAULT_INJECTION_RPC_DELAY

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit efae8dcf3b70ec1e0ccb7bdd45084b03c0be4354
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Fri Mar 29 11:44:22 2019 -0700

    IMPALA-8138: Remove FAULT_INJECTION_RPC_DELAY
    
    This patch removes the FAULT_INJECTION_RPC_DELAY macro and replaces
    its uses with DebugAction which is more flexible. For example, it
    supports JITTER which injects random delays.
    
    Every backend rpc has a debug action of the form RPC_NAME_DELAY.
    
    DebugAction has previously always been used via query options.
    However, for the rpcs considered here there is not always a query with
    an accessible TQUeryOptions available (for example, we do not send any
    query info with the RemoteShutdown rpc), so this patch introduces a
    flag, '--debug_actions', which is used to control these rpc delay
    debug actions.
    
    Testing:
    - Updated existing tests to use the new mechanism.
    
    Change-Id: I712b188e0cdf91f431c9b94052501e5411af407b
    Reviewed-on: http://gerrit.cloudera.org:8080/13060
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc             |  9 ++++-----
 be/src/service/control-service.cc         | 15 ++++++++-------
 be/src/service/data-stream-service.cc     |  4 +++-
 be/src/service/impala-internal-service.cc |  8 +++++---
 be/src/testutil/fault-injection-util.cc   | 16 ----------------
 be/src/testutil/fault-injection-util.h    | 24 ------------------------
 be/src/util/debug-util.cc                 |  6 ++----
 be/src/util/debug-util.h                  | 27 +++++++++++++++++----------
 tests/custom_cluster/test_rpc_timeout.py  | 28 +++++++++++++++-------------
 9 files changed, 54 insertions(+), 83 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ca1261a..427e777 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -136,11 +136,6 @@ DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes d
     "stream receiver registration to be delayed. Effective in debug builds only.");
 DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime filtering for"
     "testing purposes. Effective in debug builds only.");
-DEFINE_int32(fault_injection_rpc_delay_ms, 0, "A fault injection option that causes "
-    "rpc server handling to be delayed to trigger an RPC timeout on the caller side. "
-    "Effective in debug builds only.");
-DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that specifies "
-    "which rpc call will be injected with the delay. Effective in debug builds only.");
 DEFINE_int32(fault_injection_rpc_exception_type, 0, "A fault injection option that "
     "specifies the exception to be thrown in the caller side of an RPC call. Effective "
     "in debug builds only");
@@ -156,6 +151,10 @@ DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra d
     " in milliseconds when the I/O manager is reading from disk.");
 #endif
 
+DEFINE_string(debug_actions, "", "For testing only. Uses the same format as the debug "
+    "action query options, but allows for injection of debug actions in code paths where "
+    "query options are not available.");
+
 // Used for testing the path where the Kudu client is stubbed.
 DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");
 
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index d48b873..2c903d1 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -49,6 +49,7 @@ DEFINE_string(control_service_queue_mem_limit, "50MB", QUEUE_LIMIT_MSG.c_str());
 DEFINE_int32(control_service_num_svc_threads, 0, "Number of threads for processing "
     "control service's RPCs. if left at default value 0, it will be set to number of "
     "CPU cores. Set it to a positive value to change from the default.");
+DECLARE_string(debug_actions);
 
 namespace impala {
 
@@ -116,6 +117,9 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
   shared_ptr<ClientRequestState> request_state =
       ExecEnv::GetInstance()->impala_server()->GetClientRequestState(query_id);
 
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "REPORT_EXEC_STATUS_DELAY");
+
   if (request_state.get() == nullptr) {
     // This is expected occasionally (since a report RPC might be in flight while
     // cancellation is happening). Return an error to the caller to get it to stop.
@@ -128,9 +132,6 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
     return;
   }
 
-  // This failpoint is to allow jitter to be injected.
-  DebugActionNoFail(request_state->query_options(), "REPORT_EXEC_STATUS_DELAY");
-
   // The runtime profile is sent as a Thrift serialized buffer via sidecar. Get the
   // sidecar and deserialize the thrift profile if there is any. The sender may have
   // failed to serialize the Thrift profile so an empty thrift profile is valid.
@@ -167,8 +168,8 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
   DCHECK(request->has_query_id());
   const TUniqueId& query_id = ProtoToQueryId(request->query_id());
   VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(query_id);
-  // TODO(IMPALA-8143) Use DebugAction for fault injection.
-  FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "CANCEL_QUERY_FINSTANCES_DELAY");
   QueryState::ScopedRef qs(query_id);
   if (qs.get() == nullptr) {
     Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
@@ -182,8 +183,8 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
 
 void ControlService::RemoteShutdown(const RemoteShutdownParamsPB* req,
     RemoteShutdownResultPB* response, RpcContext* rpc_context) {
-  // TODO(IMPALA-8143) Use DebugAction for fault injection.
-  FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "REMOTE_SHUTDOWN_DELAY");
   Status status = ExecEnv::GetInstance()->impala_server()->StartShutdown(
       req->has_deadline_s() ? req->deadline_s() : -1,
       response->mutable_shutdown_status());
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 1859139..890ceec 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -49,6 +49,7 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str())
 DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
     "datastream services' RPCs. If left at default value 0, it will be set to number of "
     "CPU cores.  Set it to a positive value to change from the default.");
+DECLARE_string(debug_actions);
 
 namespace impala {
 
@@ -94,13 +95,14 @@ bool DataStreamService::Authorize(const google::protobuf::Message* req,
 
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {
+  DebugActionNoFail(FLAGS_debug_actions, "END_DATA_STREAM_DELAY");
   // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
   ExecEnv::GetInstance()->stream_mgr()->CloseSender(request, response, rpc_context);
 }
 
 void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
-  FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
+  DebugActionNoFail(FLAGS_debug_actions, "TRANSMIT_DATA_DELAY");
   // AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
   ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index d4ed14d..5260253 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -32,6 +32,8 @@
 
 using namespace impala;
 
+DECLARE_string(debug_actions);
+
 ImpalaInternalService::ImpalaInternalService() {
   impala_server_ = ExecEnv::GetInstance()->impala_server();
   DCHECK(impala_server_ != nullptr);
@@ -41,7 +43,7 @@ ImpalaInternalService::ImpalaInternalService() {
 
 void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
     const TExecQueryFInstancesParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_EXECQUERYFINSTANCES);
+  DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
   DCHECK(params.__isset.coord_state_idx);
   DCHECK(params.__isset.query_ctx);
   DCHECK(params.__isset.fragment_ctxs);
@@ -68,7 +70,7 @@ template <typename T> void SetUnknownIdError(
 
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
+  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
   DCHECK(params.__isset.filter_id);
   DCHECK(params.__isset.query_id);
   DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
@@ -77,7 +79,7 @@ void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
 
 void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
     const TPublishFilterParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
+  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
   DCHECK(params.__isset.filter_id);
   DCHECK(params.__isset.dst_query_id);
   DCHECK(params.__isset.dst_fragment_idx);
diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc
index 14e00ef..48d3a9e 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -28,8 +28,6 @@
 
 #include "common/names.h"
 
-DECLARE_int32(fault_injection_rpc_delay_ms);
-DECLARE_int32(fault_injection_rpc_type);
 DECLARE_int32(fault_injection_rpc_exception_type);
 
 namespace impala {
@@ -37,20 +35,6 @@ namespace impala {
 using apache::thrift::transport::TTransportException;
 using apache::thrift::transport::TSSLException;
 
-int32_t FaultInjectionUtil::GetTargetRPCType() {
-  int32_t target_rpc_type = FLAGS_fault_injection_rpc_type;
-  if (target_rpc_type == RPC_RANDOM) target_rpc_type = rand() % RPC_RANDOM;
-  DCHECK_LT(target_rpc_type, RPC_RANDOM);
-  return target_rpc_type;
-}
-
-void FaultInjectionUtil::InjectRpcDelay(RpcCallType my_type) {
-  int32_t delay_ms = FLAGS_fault_injection_rpc_delay_ms;
-  if (delay_ms == 0) return;
-  int32_t target_rpc_type = GetTargetRPCType();
-  if (target_rpc_type == my_type) SleepForMs(delay_ms);
-}
-
 void FaultInjectionUtil::InjectRpcException(bool is_send, int freq) {
   static AtomicInt32 send_count(-1);
   static AtomicInt32 recv_count(-1);
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
index f545e1f..0ab77d0 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -26,17 +26,6 @@ namespace impala {
 
 class FaultInjectionUtil {
  public:
-  enum RpcCallType {
-    RPC_NULL = 0,
-    RPC_EXECQUERYFINSTANCES,
-    RPC_CANCELQUERYFINSTANCES,
-    RPC_PUBLISHFILTER,
-    RPC_UPDATEFILTER,
-    RPC_TRANSMITDATA,
-    RPC_REPORTEXECSTATUS,
-    RPC_REMOTESHUTDOWN,
-    RPC_RANDOM    // This must be last.
-  };
 
   enum RpcExceptionType {
     RPC_EXCEPTION_NONE = 0,
@@ -52,26 +41,14 @@ class FaultInjectionUtil {
     RPC_EXCEPTION_SSL_RECV_TIMEDOUT,
   };
 
-  /// Test util function that injects delays to specified RPC server handling function
-  /// so that RPC caller could hit the RPC recv timeout condition.
-  /// 'my_type' specifies which RPC type of the current function.
-  /// FLAGS_fault_injection_rpc_type specifies which RPC function the delay should
-  /// be enabled. FLAGS_fault_injection_rpc_delay_ms specifies the delay in ms.
-  static void InjectRpcDelay(RpcCallType my_type);
-
   /// Test util function that injects exceptions to RPC client functions.
   /// 'is_send' indicates whether injected fault is at the send() or recv() of an RPC.
   /// The exception specified in 'FLAGS_fault_injection_rpc_exception_type' is injected
   /// on every 'freq' invocations of this function.
   static void InjectRpcException(bool is_send, int freq);
 
- private:
-  static int32_t GetTargetRPCType();
-
 };
 
-#define FAULT_INJECTION_RPC_DELAY(type)                          \
-    FaultInjectionUtil::InjectRpcDelay(FaultInjectionUtil::type)
 #define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)                 \
     FaultInjectionUtil::InjectRpcException(true, freq)
 #define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)                 \
@@ -79,7 +56,6 @@ class FaultInjectionUtil {
 
 #else // NDEBUG
 
-#define FAULT_INJECTION_RPC_DELAY(type)
 #define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)
 #define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)
 
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 552c16c..7029a71 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -328,10 +328,8 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) {
   return true;
 }
 
-Status DebugActionImpl(
-    const TQueryOptions& query_options, const char* label) {
-  const DebugActionTokens& action_list = TokenizeDebugActions(
-      query_options.debug_action);
+Status DebugActionImpl(const string& debug_action, const char* label) {
+  const DebugActionTokens& action_list = TokenizeDebugActions(debug_action);
   static const char ERROR_MSG[] = "Invalid debug_action $0:$1 ($2)";
   for (const vector<string>& components : action_list) {
     // size() != 2 check filters out ExecNode debug actions.
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 1ca53b5..b2235ac 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -140,19 +140,29 @@ DebugActionTokens TokenizeDebugActions(const string& debug_actions);
 /// becomes {"x", "y"} and "x" becomes {"x"}.
 std::vector<std::string> TokenizeDebugActionParams(const string& action);
 
-/// Slow path implementing DebugAction() for the case where
-/// 'query_options.debug_action' is non-empty.
-Status DebugActionImpl(
-    const TQueryOptions& query_options, const char* label) WARN_UNUSED_RESULT;
+/// Slow path implementing DebugAction() for the case where 'debug_action' is non-empty.
+Status DebugActionImpl(const string& debug_action, const char* label) WARN_UNUSED_RESULT;
 
 /// If debug_action query option has a "global action" (i.e. not exec-node specific)
 /// and matches the given 'label', apply the the action. See ImpalaService.thrift for
 /// details of the format and available global actions. For ExecNode code, use
 /// ExecNode::ExecDebugAction() instead.
 WARN_UNUSED_RESULT static inline Status DebugAction(
+    const string& debug_action, const char* label) {
+  if (LIKELY(debug_action.empty())) return Status::OK();
+  return DebugActionImpl(debug_action, label);
+}
+
+WARN_UNUSED_RESULT static inline Status DebugAction(
     const TQueryOptions& query_options, const char* label) {
-  if (LIKELY(query_options.debug_action.empty())) return Status::OK();
-  return DebugActionImpl(query_options, label);
+  return DebugAction(query_options.debug_action, label);
+}
+
+static inline void DebugActionNoFail(const string& debug_action, const char* label) {
+  Status status = DebugAction(debug_action, label);
+  if (!status.ok()) {
+    LOG(ERROR) << "Ignoring debug action failure: " << status.GetDetail();
+  }
 }
 
 /// Like DebugAction() but for use in contexts that can't safely propagate an error
@@ -160,10 +170,7 @@ WARN_UNUSED_RESULT static inline Status DebugAction(
 /// and ignored.
 static inline void DebugActionNoFail(
     const TQueryOptions& query_options, const char* label) {
-  Status status = DebugAction(query_options, label);
-  if (!status.ok()) {
-    LOG(ERROR) << "Ignoring debug action failure: " << status.GetDetail();
-  }
+  DebugActionNoFail(query_options.debug_action, label);
 }
 
 // FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 40b02fc..166b385 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -85,7 +85,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=1000 --fault_injection_rpc_type=1"
+      " --debug_actions=EXEC_QUERY_FINSTANCES_DELAY:SLEEP@1000"
       " --datastream_sender_timeout_ms=30000")
   def test_execqueryfinstances_race(self, vector):
     """ Test for IMPALA-7464, where the rpc times out while the rpc handler continues to
@@ -94,7 +94,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=1"
+      " --debug_actions=EXEC_QUERY_FINSTANCES_DELAY:SLEEP@3000"
       " --datastream_sender_timeout_ms=30000")
   def test_execqueryfinstances_timeout(self, vector):
     for i in range(3):
@@ -109,7 +109,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=2"
+      " --debug_actions=CANCEL_QUERY_FINSTANCES_DELAY:SLEEP@3000"
       " --datastream_sender_timeout_ms=30000")
   def test_cancelplanfragment_timeout(self, vector):
     query = "select * from tpch.lineitem limit 5000"
@@ -117,20 +117,22 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=3")
+      " --debug_actions=PUBLISH_FILTER_DELAY:SLEEP@3000")
   def test_publishfilter_timeout(self, vector):
     self.execute_runtime_filter_query()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=4")
+      " --debug_actions=UPDATE_FILTER_DELAY:SLEEP@3000")
   def test_updatefilter_timeout(self, vector):
     self.execute_runtime_filter_query()
 
+  all_rpcs = ["EXEC_QUERY_FINSTANCES", "CANCEL_QUERY_FINSTANCES", "PUBLISH_FILTER",
+      "UPDATE_FILTER", "TRANSMIT_DATA", "END_DATA_STREAM", "REMOTE_SHUTDOWN"]
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=7"
-      " --datastream_sender_timeout_ms=30000")
+      " --datastream_sender_timeout_ms=30000 --debug_actions=%s" %
+      "|".join(map(lambda rpc: "%s_DELAY:JITTER@3000@0.1" % rpc, all_rpcs)))
   def test_random_rpc_timeout(self, vector):
     self.execute_query_verify_metrics(self.TEST_QUERY, None, 10)
 
@@ -138,15 +140,15 @@ class TestRPCTimeout(CustomClusterTestSuite):
   # Useful for triggering IMPALA-8274.
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--status_report_interval_ms=100"
-      " --backend_client_rpc_timeout_ms=100")
+      " --backend_client_rpc_timeout_ms=100"
+      " --debug_actions=REPORT_EXEC_STATUS_DELAY:JITTER@110@0.7")
   def test_reportexecstatus_jitter(self, vector):
     LONG_RUNNING_QUERY = "with v as (select t1.ss_hdemo_sk as xk " +\
        "from tpcds_parquet.store_sales t1, tpcds_parquet.store_sales t2 " +\
        "where t1.ss_hdemo_sk = t2.ss_hdemo_sk) " +\
        "select count(*) from v, tpcds_parquet.household_demographics t3 " +\
        "where v.xk = t3.hd_demo_sk"
-    query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:JITTER@110@0.7'}
-    self.execute_query_verify_metrics(LONG_RUNNING_QUERY, query_options, 1)
+    self.execute_query_verify_metrics(LONG_RUNNING_QUERY, None, 1)
 
   # Use a small service queue memory limit and a single service thread to exercise
   # the retry paths in the ReportExecStatus() RPC
@@ -165,16 +167,16 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=100"
-      " --status_report_interval_ms=1000 --status_report_max_retry_s=1000")
+      " --status_report_interval_ms=1000 --status_report_max_retry_s=1000"
+      " --debug_actions=REPORT_EXEC_STATUS_DELAY:SLEEP@1000")
   def test_reportexecstatus_retries(self, unique_database):
     tbl = "%s.kudu_test" % unique_database
     self.execute_query("create table %s (a int primary key) stored as kudu" % tbl)
     # Since the sleep time (1000ms) is much longer than the rpc timeout (100ms), all
     # reports will appear to fail. The query is designed to result in many intermediate
     # status reports but fewer than the max allowed failures, so the query should succeed.
-    query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:SLEEP@1000'}
     result = self.execute_query(
-        "insert into %s select 0 from tpch.lineitem limit 100000" % tbl, query_options)
+        "insert into %s select 0 from tpch.lineitem limit 100000" % tbl)
     assert result.success, str(result)
     # Ensure that the error log was tracked correctly - all but the first row inserted
     # should result in a 'key already present' insert error.


[impala] 02/06: IMPALA-8537: Negative values reported for tmp-file-mgr.scratch-space-bytes-used under heavy spilling load

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 454d85be8a5b003c725586d2f75ee672925f1962
Author: Abhishek <ar...@cloudera.com>
AuthorDate: Mon May 13 16:43:08 2019 -0700

    IMPALA-8537: Negative values reported for
    tmp-file-mgr.scratch-space-bytes-used under heavy spilling load
    
    Whenever closing a FileGroup, the TmpFileMgr::scratch_bytes_used_metric_
    was incorrectly being decremented by the total scratch space bytes
    across the entire FileGroup
    (i.e FileGroup::scratch_space_bytes_used_counter_), for every File in
    the FileGroup. This was resulting in the -ive value for the current
    scratch space bytes.
    
    The fix is to decrement the TmpFileMgr::scratch_bytes_used_metric_ by
    the FileGroup::scratch_space_bytes_used_counter_, only once when the
    FileGroup is closed.
    
    Testing Done:
    - Added checks for expected current value and HWM of the scratch space
      bytes in some of the existing test units in tmp-file-mgr-test.cc.
    - Added a new scenario in tmp-file-mgr-test.cc which mimics concurrent
      spilling queries and checks for propper current and HWM value for
      the scratch space bytes.
    - Ad-hoc tests forcing multiple scratch space dirs/files and running
      concurrent spilling queries while making sure the current value is
      never -ive.
    
    Change-Id: I338ecc06ddfad414091bd50f683b767b61abdcc4
    Reviewed-on: http://gerrit.cloudera.org:8080/13326
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-test.cc | 82 ++++++++++++++++++++++++++++++++++++-
 be/src/runtime/tmp-file-mgr.cc      |  8 ++--
 be/src/util/metrics.h               |  2 +
 3 files changed, 86 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index bd53dd6..f838c47 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -92,6 +92,16 @@ class TmpFileMgrTest : public ::testing::Test {
     }
   }
 
+  /// Check that current scratch space bytes and HWM match the expected values.
+  void checkHWMMetrics(int64_t exp_current_value, int64_t exp_hwm_value) {
+    AtomicHighWaterMarkGauge* hwm_value =
+        metrics_->FindMetricForTesting<AtomicHighWaterMarkGauge>(
+            "tmp-file-mgr.scratch-space-bytes-used-high-water-mark");
+    IntGauge* current_value = hwm_value->current_value_;
+    ASSERT_EQ(current_value->GetValue(), exp_current_value);
+    ASSERT_EQ(hwm_value->GetValue(), exp_hwm_value);
+  }
+
   void RemoveAndCreateDirs(const vector<string>& dirs) {
     for (const string& dir: dirs) {
       ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
@@ -393,15 +403,24 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
   ASSERT_NE(string::npos, status.msg().msg().find(GetBackendString()));
 
+  // Check HWM metrics
+  checkHWMMetrics(LIMIT, LIMIT);
   file_group.Close();
+  checkHWMMetrics(0, LIMIT);
 }
 
 // Test that scratch file ranges of varying length are recycled as expected.
 TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
   TUniqueId id;
-  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id);
+
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
   int64_t expected_scratch_bytes_allocated = 0;
   // Test some different allocation sizes.
+  checkHWMMetrics(0, 0);
   for (int alloc_size = 64; alloc_size <= 64 * 1024; alloc_size *= 2) {
     // Generate some data.
     const int BLOCKS = 5;
@@ -417,6 +436,7 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
     // 'file_group' should allocate extra scratch bytes for this 'alloc_size'.
     expected_scratch_bytes_allocated += alloc_size * BLOCKS;
     const int TEST_ITERS = 5;
+
     // Make sure free space doesn't grow over several iterations.
     for (int i = 0; i < TEST_ITERS; ++i) {
       cb_counter_ = 0;
@@ -426,6 +446,7 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
       }
       WaitForCallbacks(BLOCKS);
       EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
+      checkHWMMetrics(expected_scratch_bytes_allocated, expected_scratch_bytes_allocated);
 
       // Read back and validate.
       for (int j = 0; j < BLOCKS; ++j) {
@@ -437,10 +458,11 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
       // Check that the space is still in use - it should be recycled by the next
       // iteration.
       EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
+      checkHWMMetrics(expected_scratch_bytes_allocated, expected_scratch_bytes_allocated);
     }
   }
   file_group.Close();
-  test_env_->TearDownQueries();
+  checkHWMMetrics(0, expected_scratch_bytes_allocated);
 }
 
 // Regression test for IMPALA-4748, where hitting the process memory limit caused
@@ -565,6 +587,62 @@ void TmpFileMgrTest::TestBlockVerification() {
   file_group.Close();
   test_env_->TearDownQueries();
 }
+
+// Test that the current scratch space bytes and HWM values are proper when different
+// FileGroups are used concurrently. This test unit mimics concurrent spilling queries.
+TEST_F(TmpFileMgrTest, TestHWMMetric) {
+  RuntimeProfile* profile_1 = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test-1");
+  RuntimeProfile* profile_2 = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test-2");
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+
+  const int64_t LIMIT = 128;
+  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
+  const int64_t ALLOC_SIZE = 64;
+  TUniqueId id_1;
+  TmpFileMgr::FileGroup file_group_1(&tmp_file_mgr, io_mgr(), profile_1, id_1, LIMIT);
+  TUniqueId id_2;
+  TmpFileMgr::FileGroup file_group_2(&tmp_file_mgr, io_mgr(), profile_2, id_2, LIMIT);
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group_1, &files));
+  ASSERT_OK(CreateFiles(&file_group_2, &files));
+
+  Status status;
+  int64_t offset;
+  TmpFileMgr::File* alloc_file;
+
+  // Alloc from file_group_1 and file_group_2 interleaving allocations.
+  SetNextAllocationIndex(&file_group_1, 0);
+  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[0]);
+  ASSERT_EQ(0, offset);
+
+  SetNextAllocationIndex(&file_group_2, 0);
+  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[2]);
+  ASSERT_EQ(0, offset);
+
+  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(0, offset);
+  ASSERT_EQ(alloc_file, files[1]);
+
+  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(0, offset);
+  ASSERT_EQ(alloc_file, files[3]);
+
+  EXPECT_EQ(LIMIT, BytesAllocated(&file_group_1));
+  EXPECT_EQ(LIMIT, BytesAllocated(&file_group_2));
+
+  // Check HWM metrics
+  checkHWMMetrics(2 * LIMIT, 2 * LIMIT);
+  file_group_1.Close();
+  checkHWMMetrics(LIMIT, 2 * LIMIT);
+  file_group_2.Close();
+  checkHWMMetrics(0, 2 * LIMIT);
+}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 32c1509..372e301 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -283,14 +283,14 @@ void TmpFileMgr::FileGroup::Close() {
   if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_.get());
   for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
     Status status = file->Remove();
-    if (status.ok()) {
-      tmp_file_mgr_->scratch_bytes_used_metric_->Increment(
-          -1 * scratch_space_bytes_used_counter_->value());
-    } else {
+    if (!status.ok()) {
       LOG(WARNING) << "Error removing scratch file '" << file->path()
                    << "': " << status.msg().msg();
     }
   }
+  tmp_file_mgr_->scratch_bytes_used_metric_->Increment(
+      -1 * scratch_space_bytes_used_counter_->value());
+
   tmp_files_.clear();
 }
 
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 80b863b..80f899d 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -288,6 +288,8 @@ class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE
 
  private:
   FRIEND_TEST(MetricsTest, AtomicHighWaterMarkGauge);
+  friend class TmpFileMgrTest;
+
   /// Set 'hwm_value_' to 'v' if 'v' is larger than 'hwm_value_'. The entire operation is
   /// atomic.
   void UpdateMax(int64_t v) {


[impala] 03/06: IMPALA-966: Attribute type error to the right expression in an insert statement

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 30ea70a7ff00bad1c104b63083fdef06e406c595
Author: Alice Fan <fa...@gmail.com>
AuthorDate: Tue Apr 16 19:12:43 2019 -0700

    IMPALA-966: Attribute type error to the right expression in an insert
    statement
    
    Currently, if an insert statement contains multiple expressions
    that are incompatible with the column type, the error message
    returned attributes the error to the wrong expression.
    This patch makes sure the right expression is blamed. If there are
    multiple incompatible type values for the target column, then the
    error is attributed to the first widest (highest precision)
    incompatible type expression.
    
    Testing:
    - Added tests to AnalyzeStmtsTest.java
    
    Change-Id: I88718fc2cbe1a492165435a542fd2d91d8693a39
    Reviewed-on: http://gerrit.cloudera.org:8080/13050
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/analysis/Analyzer.java  | 21 +++++++++-----
 .../org/apache/impala/analysis/InsertStmt.java     | 22 +++++++++++----
 .../org/apache/impala/analysis/ModifyStmt.java     |  2 +-
 .../org/apache/impala/analysis/StatementBase.java  | 17 +++++++----
 .../java/org/apache/impala/analysis/UnionStmt.java | 12 +++++++-
 .../apache/impala/analysis/AnalyzeExprsTest.java   |  2 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   | 33 +++++++++++++++++++++-
 7 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index eded0d3..d54d02e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -2323,25 +2323,31 @@ public class Analyzer {
   /**
    * Casts the exprs in the given lists position-by-position such that for every i,
    * the i-th expr among all expr lists is compatible.
+   * Returns a list of exprs such that for every i-th expr in that list, it is the first
+   * widest compatible expression encountered among all i-th exprs in the expr lists.
+   * Returns null if an empty expression list or null is passed to it.
    * Throw an AnalysisException if the types are incompatible.
    */
-  public void castToUnionCompatibleTypes(List<List<Expr>> exprLists)
+  public List<Expr> castToUnionCompatibleTypes(List<List<Expr>> exprLists)
       throws AnalysisException {
-    if (exprLists == null || exprLists.size() < 2) return;
+    if (exprLists == null || exprLists.size() == 0) return null;
+    if (exprLists.size() == 1) return exprLists.get(0);
 
     // Determine compatible types for exprs, position by position.
     List<Expr> firstList = exprLists.get(0);
+    List<Expr> widestExprs = new ArrayList<>(firstList.size());
     for (int i = 0; i < firstList.size(); ++i) {
       // Type compatible with the i-th exprs of all expr lists.
       // Initialize with type of i-th expr in first list.
       Type compatibleType = firstList.get(i).getType();
-      // Remember last compatible expr for error reporting.
-      Expr lastCompatibleExpr = firstList.get(i);
+      widestExprs.add(firstList.get(i));
       for (int j = 1; j < exprLists.size(); ++j) {
         Preconditions.checkState(exprLists.get(j).size() == firstList.size());
-        compatibleType = getCompatibleType(compatibleType,
-            lastCompatibleExpr, exprLists.get(j).get(i));
-        lastCompatibleExpr = exprLists.get(j).get(i);
+        Type preType = compatibleType;
+        compatibleType = getCompatibleType(
+            compatibleType, widestExprs.get(i), exprLists.get(j).get(i));
+        // compatibleType will be updated if a new wider type is encountered
+        if (preType != compatibleType) widestExprs.set(i, exprLists.get(j).get(i));
       }
       // Now that we've found a compatible type, add implicit casts if necessary.
       for (int j = 0; j < exprLists.size(); ++j) {
@@ -2351,6 +2357,7 @@ public class Analyzer {
         }
       }
     }
+    return widestExprs;
   }
 
   public String getDefaultDb() { return globalState_.queryCtx.session.database; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index f3bf94c..1dcc99f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -684,12 +684,23 @@ public class InsertStmt extends StatementBase {
       kuduPartitionColumnNames = getKuduPartitionColumnNames((FeKuduTable) table_);
     }
 
+    UnionStmt unionStmt =
+        (queryStmt_ instanceof UnionStmt) ? (UnionStmt) queryStmt_ : null;
+    List<Expr> widestTypeExprList = null;
+    if (unionStmt != null && unionStmt.getWidestExprs() != null
+        && unionStmt.getWidestExprs().size() > 0) {
+      widestTypeExprList = unionStmt.getWidestExprs();
+    }
+
     // Check dynamic partition columns for type compatibility.
     for (int i = 0; i < selectListExprs.size(); ++i) {
       Column targetColumn = selectExprTargetColumns.get(i);
-      Expr compatibleExpr = checkTypeCompatibility(
-          targetTableName_.toString(), targetColumn, selectListExprs.get(i),
-          analyzer.getQueryOptions().isDecimal_v2());
+      // widestTypeExpr is widest type expression for column i
+      Expr widestTypeExpr =
+          (widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
+      Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
+          targetColumn, selectListExprs.get(i), analyzer.getQueryOptions().isDecimal_v2(),
+          widestTypeExpr);
       if (targetColumn.getPosition() < numClusteringCols) {
         // This is a dynamic clustering column
         tmpPartitionKeyExprs.add(compatibleExpr);
@@ -710,9 +721,8 @@ public class InsertStmt extends StatementBase {
         if (pkv.isStatic()) {
           // tableColumns is guaranteed to exist after the earlier analysis checks
           Column tableColumn = table_.getColumn(pkv.getColName());
-          Expr compatibleExpr = checkTypeCompatibility(
-              targetTableName_.toString(), tableColumn,
-              pkv.getLiteralValue(), analyzer.isDecimalV2());
+          Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
+              tableColumn, pkv.getLiteralValue(), analyzer.isDecimalV2(), null);
           tmpPartitionKeyExprs.add(compatibleExpr);
           tmpPartitionKeyNames.add(pkv.getColName());
         }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 5d34b97..bdacc88 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -288,7 +288,7 @@ public abstract class ModifyStmt extends StatementBase {
       }
 
       rhsExpr = checkTypeCompatibility(targetTableRef_.getDesc().getTable().getFullName(),
-          c, rhsExpr, analyzer.isDecimalV2());
+          c, rhsExpr, analyzer.isDecimalV2(), null /*widestTypeSrcExpr*/);
       uniqueSlots.add(lhsSlotRef.getSlotId());
       selectList.add(new SelectListItem(rhsExpr, null));
       referencedColumns.add(colIndexMap.get(c.getName()));
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index 4599e1a..e0fb2de 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -193,14 +193,19 @@ public abstract class StatementBase extends StmtNode {
    * are incompatible. 'dstTableName' is only used when constructing an AnalysisException
    * message.
    *
+   * 'widestTypeSrcExpr' is the first widest type expression of the source expressions.
+   * This is only used when constructing an AnalysisException message to make sure the
+   * right expression is blamed in the error message.
+   *
    * If strictDecimal is true, only consider casts that result in no loss of information
    * when casting between decimal types.
    */
-  protected Expr checkTypeCompatibility(String dstTableName, Column dstCol,
-      Expr srcExpr, boolean strictDecimal) throws AnalysisException {
+  protected Expr checkTypeCompatibility(String dstTableName, Column dstCol, Expr srcExpr,
+      boolean strictDecimal, Expr widestTypeSrcExpr) throws AnalysisException {
     Type dstColType = dstCol.getType();
     Type srcExprType = srcExpr.getType();
 
+    if (widestTypeSrcExpr == null) widestTypeSrcExpr = srcExpr;
     // Trivially compatible, unless the type is complex.
     if (dstColType.equals(srcExprType) && !dstColType.isComplexType()) return srcExpr;
 
@@ -215,10 +220,10 @@ public abstract class StatementBase extends StmtNode {
     }
     if (!compatType.equals(dstColType) && !compatType.isNull()) {
       throw new AnalysisException(String.format(
-          "Possible loss of precision for target table '%s'.\nExpression '%s' (type: " +
-              "%s) would need to be cast to %s for column '%s'",
-          dstTableName, srcExpr.toSql(), srcExprType.toSql(), dstColType.toSql(),
-          dstCol.getName()));
+          "Possible loss of precision for target table '%s'.\nExpression '%s' (type: "
+              + "%s) would need to be cast to %s for column '%s'",
+          dstTableName, widestTypeSrcExpr.toSql(), srcExprType.toSql(),
+          dstColType.toSql(), dstCol.getName()));
     }
     return srcExpr.castTo(compatType);
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index 8438c53..f32fa4b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -157,6 +157,12 @@ public class UnionStmt extends QueryStmt {
   // (if any). Same as resultExprs_ if there is no ORDER BY.
   private List<Expr> unionResultExprs_ = new ArrayList<>();
 
+  // List of expressions produced by analyzer.castToUnionCompatibleTypes().
+  // Contains a list of exprs such that for every i-th expr in that list, it is the first
+  // widest compatible expression encountered among all i-th exprs in every result expr
+  // list of the union operands.
+  protected List<Expr> widestExprs_ = new ArrayList<>();
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -191,6 +197,7 @@ public class UnionStmt extends QueryStmt {
     hasAnalyticExprs_ = other.hasAnalyticExprs_;
     withClause_ = (other.withClause_ != null) ? other.withClause_.clone() : null;
     unionResultExprs_ = Expr.cloneList(other.unionResultExprs_);
+    widestExprs_ = other.widestExprs_;
   }
 
   public List<UnionOperand> getOperands() { return operands_; }
@@ -241,7 +248,7 @@ public class UnionStmt extends QueryStmt {
     for (UnionOperand op: operands_) {
       resultExprLists.add(op.getQueryStmt().getResultExprs());
     }
-    analyzer.castToUnionCompatibleTypes(resultExprLists);
+    widestExprs_ = analyzer.castToUnionCompatibleTypes(resultExprLists);
 
     // Create tuple descriptor materialized by this UnionStmt, its resultExprs, and
     // its sortInfo if necessary.
@@ -616,6 +623,8 @@ public class UnionStmt extends QueryStmt {
 
   public List<Expr> getUnionResultExprs() { return unionResultExprs_; }
 
+  public List<Expr> getWidestExprs() { return widestExprs_; }
+
   @Override
   public UnionStmt clone() { return new UnionStmt(this); }
 
@@ -637,5 +646,6 @@ public class UnionStmt extends QueryStmt {
     toSqlString_ = null;
     hasAnalyticExprs_ = false;
     unionResultExprs_.clear();
+    widestExprs_ = null;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index d37b1ca..62cb502 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -396,7 +396,7 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     AnalysisError("insert into functional.alltypesinsert (tinyint_col, year, month) " +
         "values(CAST(999 AS DECIMAL(3,0)), 1, 1)",
         "Possible loss of precision for target table 'functional.alltypesinsert'.\n" +
-        "Expression 'cast(999 as decimal(3,0))' (type: DECIMAL(3,0)) would need to be " +
+        "Expression 'CAST(999 AS DECIMAL(3,0))' (type: DECIMAL(3,0)) would need to be " +
         "cast to TINYINT for column 'tinyint_col'");
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 8dca67d..9b42e3e 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3399,6 +3399,37 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     AnalyzesOk("insert into d.flt select cast(1 as decimal(20, 10))", decimalV1Ctx);
     AnalyzesOk("insert into d.flt select cast(1 as decimal(20, 10))", decimalV2Ctx);
+
+    // IMPALA-966: Test insertion of incompatible expressions. Error should blame the
+    // first widest (highest precision) incompatible type expression.
+    // Test insert multiple values with compatible and incompatible types into a column
+    String query = "insert into functional.testtbl (id) "
+        + "values (10), (cast(1 as float)), (cast(3 as double))";
+    AnalysisError(query,
+        "Possible loss of precision "
+            + "for target table 'functional.testtbl'.\n"
+            + "Expression 'CAST(3 AS DOUBLE)' (type: DOUBLE) "
+            + "would need to be cast to BIGINT for column 'id'");
+    // Test insert multiple values with the same incompatible type into a column
+    query = "insert into functional.testtbl (id) "
+        + "values (cast(1 as float)), (cast(2 as float)), (cast(3 as float))";
+    AnalysisError(query,
+        "Possible loss of precision "
+            + "for target table 'functional.testtbl'.\n"
+            + "Expression 'CAST(1 AS FLOAT)' (type: FLOAT) "
+            + "would need to be cast to BIGINT for column 'id'");
+    // Test insert unions of multiple compatible and incompatible types expressions
+    // into multiple columns
+    query = "insert into functional.alltypes (int_col, float_col) "
+        + "partition(year=2019, month=4) "
+        + "(select int_col, float_col from functional.alltypes union "
+        + "select float_col, double_col from functional.alltypes union "
+        + "select double_col, int_col from functional.alltypes)";
+    AnalysisError(query,
+        "Possible loss of precision "
+            + "for target table 'functional.alltypes'.\n"
+            + "Expression 'double_col' (type: DOUBLE) "
+            + "would need to be cast to INT for column 'int_col'");
   }
 
   /**
@@ -3896,7 +3927,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
   @Test
   public void TestClone() {
     testNumberOfMembers(QueryStmt.class, 11);
-    testNumberOfMembers(UnionStmt.class, 9);
+    testNumberOfMembers(UnionStmt.class, 10);
     testNumberOfMembers(ValuesStmt.class, 0);
 
     // Also check TableRefs.


[impala] 06/06: IMPALA-8369 (part 4): Hive 3: fixes for functional dataset loading

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3567a2b5d4f797d0d48e37efc0126d022cb6a189
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri May 3 17:05:52 2019 -0700

    IMPALA-8369 (part 4): Hive 3: fixes for functional dataset loading
    
    This fixes three issues for functional dataset loading:
    
    - works around HIVE-21675, a bug in which 'CREATE VIEW IF NOT EXISTS'
      does not function correctly in our current Hive build. This has been
      fixed already, but the workaround is pretty simple, and actually the
      'drop and recreate' pattern is used more widely for data-loading than
      the 'create if not exists' one.
    
    - Moves the creation of the 'hive_index' table from
      load-dependent-tables.sql to a new load-dependent-tables-hive2.sql
      file which is only executed on Hive 2.
    
    - Moving from MR to Tez execution changed the behavior of data loading
      by disabling the auto-merging of small files. With Hive-on-MR, this
      behavior defaulted to true, but with Hive-on-Tez it defaults false.
      The change is likely motivated by the fact that Tez automatically
      groups small splits on the _input_ side and thus is less likely to
      produce lots of small files. However, that grouping functionality
      doesn't work properly in localhost clusters (TEZ-3310) so we aren't
      seeing the benefit. So, this patch enables the post-process merging of
      small files.
    
      Prior to this change, the 'alltypesaggmultifilesnopart' test table was
      getting 40+ files inside it, which broke various planner tests. With
      the change, it gets the expected 4 files.
    
    Change-Id: Ic34930dc064da3136dde4e01a011d14db6a74ecd
    Reviewed-on: http://gerrit.cloudera.org:8080/13251
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../catalog/CatalogObjectToFromThriftTest.java     |  1 +
 fe/src/test/resources/hive-site.xml.py             | 19 +++++++++++++-
 testdata/bin/create-load-data.sh                   |  5 ++++
 testdata/bin/load-dependent-tables-hive2.sql       | 30 ++++++++++++++++++++++
 testdata/bin/load-dependent-tables.sql             | 10 ++------
 .../functional/functional_schema_template.sql      |  6 +++--
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 7c5c576..0431373 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -218,6 +218,7 @@ public class CatalogObjectToFromThriftTest {
         "Skipping this test since it is only supported when running against Hive-2",
         TestUtils.getHiveMajorVersion() == 2);
     Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
+    Assert.assertNotNull(table);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "hive_index_tbl");
     Assert.assertEquals(thriftTable.db_name, "functional");
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index 65d65e4..0124a56 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -84,11 +84,28 @@ if hive_major_version >= 3:
    # We run YARN with Tez on the classpath directly
    'tez.ignore.lib.uris': 'true',
    'tez.use.cluster.hadoop-libs': 'true',
+
    # Some of the tests change the columns in a incompatible manner
    # (eg. string to timestamp) this is disallowed by default in Hive-3 which causes
    # these tests to fail. We disable this behavior in minicluster to keep running the
    # same tests on both hms-2 and hms-3
-   'hive.metastore.disallow.incompatible.col.type.changes': 'false'
+   'hive.metastore.disallow.incompatible.col.type.changes': 'false',
+
+   # Group input splits to run in a small number of mappers, and merge small
+   # files at the end of jobs if necessary, to be more similar to the legacy
+   # MR execution defaults. This helps ensure that we produce the same
+   # dataload results with Hive2-MR vs Hive3-Tez.
+   #
+   # NOTE: This currently doesn't seem to take effect on our pseudo-distributed
+   # test cluster, because the hostname is 'localhost' and some Tez code path
+   # gets triggered which ignores the min-size parameter. See TEZ-3310.
+   'tez.grouping.min-size': 256 * 1024 * 1024,
+
+   # Instead, we use post-process merging to make sure that we merge files
+   # where possible at the end of jobs.
+   # TODO(todd) re-evaluate whether this is necessary once TEZ-3310 is fixed
+   # (see above).
+   'hive.merge.tezfiles': 'true',
   })
 else:
   CONFIG.update({
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index c2122d0..74f0f63 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -390,6 +390,11 @@ function copy-and-load-dependent-tables {
   # TODO: Find a good way to integrate this with the normal data loading scripts
   beeline -n $USER -u "${JDBC_URL}" -f\
     ${IMPALA_HOME}/testdata/bin/load-dependent-tables.sql
+
+  if [[ "$IMPALA_HIVE_MAJOR_VERSION" == "2" ]]; then
+    beeline -n $USER -u "${JDBC_URL}" -f\
+      ${IMPALA_HOME}/testdata/bin/load-dependent-tables-hive2.sql
+  fi
 }
 
 function create-internal-hbase-table {
diff --git a/testdata/bin/load-dependent-tables-hive2.sql b/testdata/bin/load-dependent-tables-hive2.sql
new file mode 100644
index 0000000..0585fc6
--- /dev/null
+++ b/testdata/bin/load-dependent-tables-hive2.sql
@@ -0,0 +1,30 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Create and load tables that depend upon data in the hive test-warehouse
+-- already existing.
+--
+-- The queries in this file will only be executed on Hive 2 (and not later
+-- versions).
+
+
+USE functional;
+DROP INDEX IF EXISTS hive_index ON alltypes;
+CREATE INDEX hive_index ON TABLE alltypes (int_col)
+AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
+WITH DEFERRED REBUILD IN TABLE hive_index_tbl;
+
diff --git a/testdata/bin/load-dependent-tables.sql b/testdata/bin/load-dependent-tables.sql
index d4ff102..a75c4af 100644
--- a/testdata/bin/load-dependent-tables.sql
+++ b/testdata/bin/load-dependent-tables.sql
@@ -106,11 +106,5 @@ TBLPROPERTIES ('avro.schema.literal'='{"type":"record",
 
 ---- Unsupported Impala table types
 USE functional;
-CREATE VIEW IF NOT EXISTS hive_view AS SELECT 1 AS int_col FROM alltypes limit 1;
-
-USE functional;
-DROP INDEX IF EXISTS hive_index ON alltypes;
-CREATE INDEX hive_index ON TABLE alltypes (int_col)
-AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
-WITH DEFERRED REBUILD IN TABLE hive_index_tbl;
-
+DROP VIEW IF EXISTS hive_view;
+CREATE VIEW hive_view AS SELECT 1 AS int_col FROM alltypes limit 1;
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index f6818ff..187f478 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -993,15 +993,17 @@ functional
 alltypes_hive_view
 ---- CREATE_HIVE
 -- Test that Impala can handle incorrect column metadata created by Hive (IMPALA-994).
+DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
 -- Beeline cannot handle the stmt below when broken up into multiple lines.
-CREATE VIEW IF NOT EXISTS {db_name}{db_suffix}.{table_name} AS SELECT * FROM {db_name}{db_suffix}.alltypes;
+CREATE VIEW {db_name}{db_suffix}.{table_name} AS SELECT * FROM {db_name}{db_suffix}.alltypes;
 ====
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
 alltypes_view_sub
 ---- CREATE
-CREATE VIEW IF NOT EXISTS {db_name}{db_suffix}.{table_name} (x, y, z)
+DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE VIEW {db_name}{db_suffix}.{table_name} (x, y, z)
 AS SELECT int_col, string_col, timestamp_col FROM {db_name}{db_suffix}.alltypes;
 ---- LOAD
 ====


[impala] 01/06: IMPALA-8369: Fixing some core tests in Hive environment

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9dd8d8241a6f3b20d4625560416498dc02498945
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Wed May 8 20:46:38 2019 +0200

    IMPALA-8369: Fixing some core tests in Hive environment
    
    Fixes:
    impala_test_suite.py:
      DROP PARTITIONS in the SETUP section of test files did
      not work with Hive 3, because 'max_parts' argument of
      hive_client.get_partition_names() was 0, while it should
      be -1 to return all partitions. The issue broke sevaral
      'insert' tests.
      Hive 2 used to return all partitions with argument 0 too
      but Hive 3 changed this to be more consistent, see HIVE-18567.
    load_nested.py:
      query/test_mt_dop.py:test_parquet_filtering amd several planner
      tests were broken because Hive 3 generates different number of
      files for tpch_nested_parquet.customer than Hive 2. The fix is to
      split the loading of this table to two inserts on Hive 3 in order
      to produce an extra file.
    
    Change-Id: I45d9b9312c6c77f436ab020ae68c15f3c7c737de
    Reviewed-on: http://gerrit.cloudera.org:8080/13283
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 testdata/bin/load_nested.py       | 50 ++++++++++++++++++++++++++++-----------
 tests/common/environ.py           |  2 ++
 tests/common/impala_test_suite.py |  2 +-
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py
index be0dc13..d0f9066 100755
--- a/testdata/bin/load_nested.py
+++ b/testdata/bin/load_nested.py
@@ -24,6 +24,7 @@ import logging
 import os
 
 from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
+from tests.common.environ import HIVE_MAJOR_VERSION
 import tests.comparison.cli_options as cli_options
 
 
@@ -292,27 +293,48 @@ def load():
   # Hive is used to convert the data into parquet/orc and drop all the temp tables.
   # The Hive SET values are necessary to prevent Impala remote reads of parquet files.
   # These values are taken from http://blog.cloudera.com/blog/2014/12/the-impala-cookbook.
-  with cluster.hive.cursor(db_name=target_db) as hive:
-    LOG.info("Converting temp tables")
-    for stmt in """
-        SET mapred.min.split.size=1073741824;
-        SET parquet.block.size=10737418240;
-        SET dfs.block.size=1073741824;
-
+  create_final_tables_sql = """
+      SET mapred.min.split.size=1073741824;
+      SET parquet.block.size=10737418240;
+      SET dfs.block.size=1073741824;
+
+      CREATE TABLE region
+      STORED AS {file_format}
+      TBLPROPERTIES('{compression_key}'='{compression_value}')
+      AS SELECT * FROM tmp_region;
+
+      CREATE TABLE supplier
+      STORED AS {file_format}
+      TBLPROPERTIES('{compression_key}'='{compression_value}')
+      AS SELECT * FROM tmp_supplier;"""
+
+  # A simple CTAS for tpch_nested_parquet.customer would create 3 files with Hive3 vs
+  # 4 files with Hive2. This difference would break several tests, and it seemed
+  # easier to hack the loading of the table than to add Hive version specific behavior
+  # for each affected test. A small part of the table is inserted in a separate statement
+  # to generate the +1 file (needs hive.merge.tezfiles to avoid creating +3 files).
+  # TODO: find a less hacky way to ensure a fix number of files
+  if HIVE_MAJOR_VERSION >= 3 and file_format == "parquet":
+    create_final_tables_sql += """
         CREATE TABLE customer
         STORED AS {file_format}
         TBLPROPERTIES('{compression_key}'='{compression_value}')
-        AS SELECT * FROM tmp_customer;
+        AS SELECT * FROM tmp_customer
+        WHERE c_custkey >= 10;
 
-        CREATE TABLE region
+        INSERT INTO customer
+        SELECT * FROM tmp_customer
+        WHERE c_custkey < 10;"""
+  else:
+    create_final_tables_sql += """
+        CREATE TABLE customer
         STORED AS {file_format}
         TBLPROPERTIES('{compression_key}'='{compression_value}')
-        AS SELECT * FROM tmp_region;
+        AS SELECT * FROM tmp_customer;"""
 
-        CREATE TABLE supplier
-        STORED AS {file_format}
-        TBLPROPERTIES('{compression_key}'='{compression_value}')
-        AS SELECT * FROM tmp_supplier;""".format(**sql_params).split(";"):
+  with cluster.hive.cursor(db_name=target_db) as hive:
+    LOG.info("Converting temp tables")
+    for stmt in create_final_tables_sql.format(**sql_params).split(";"):
       if not stmt.strip():
         continue
       LOG.info("Executing: {0}".format(stmt))
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 5b92755..30805e7 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -57,6 +57,8 @@ if docker_network_search_result is not None:
   docker_network = docker_network_search_result.groups()[0]
 IS_DOCKERIZED_TEST_CLUSTER = docker_network is not None
 
+HIVE_MAJOR_VERSION = int(os.environ.get("IMPALA_HIVE_MAJOR_VERSION"))
+
 # Resolve any symlinks in the path.
 impalad_basedir = \
     os.path.realpath(os.path.join(IMPALA_HOME, 'be/build', build_type_dir)).rstrip('/')
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 6630b3f..98e7d71 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -735,7 +735,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
   def __drop_partitions(self, db_name, table_name):
     """Drops all partitions in the given table"""
-    for partition in self.hive_client.get_partition_names(db_name, table_name, 0):
+    for partition in self.hive_client.get_partition_names(db_name, table_name, -1):
       assert self.hive_client.drop_partition_by_name(db_name, table_name, \
           partition, True), 'Could not drop partition: %s' % partition
 


[impala] 04/06: IMPALA-8369 (part 3): Hive 3: fix test_permanent_udfs.py for Hive 3 support

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bda8d95f2a9b566be9eb3d56b453ff545b7a5b6a
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri May 3 15:52:59 2019 -0700

    IMPALA-8369 (part 3): Hive 3: fix test_permanent_udfs.py for Hive 3 support
    
    This fixes two issues in test_permanent_udfs.py:
    
    - two of Hive's built-ins were ported to the new GenericUDF interface
      which Impala can't execute. These UDFs are now excluded from the test
      when running with Hive 3.
    
    - The 'hive' commandline is deprecated nowadays, so the test now uses
      the standard HS2 approach to run Hive queries. Hive 2+ caches UDFs, so
      now that we are connecting to an already-running HS2 rather than
      starting a new standalone 'hive' command, we need to explicitly
      invalidate that cache by using 'RELOAD FUNCTION' after making changes
      to UDFs in Impala.
    
    Change-Id: I7f50845c7d4769d8843cad87988498e165902169
    Reviewed-on: http://gerrit.cloudera.org:8080/13236
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Todd Lipcon <to...@apache.org>
---
 tests/common/impala_test_suite.py           |  2 +
 tests/custom_cluster/test_permanent_udfs.py | 59 ++++++++++++-----------------
 2 files changed, 27 insertions(+), 34 deletions(-)

diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 98e7d71..fffad06 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -775,6 +775,8 @@ class ImpalaTestSuite(BaseTestSuite):
     # This should never happen.
     assert 0, 'Unable to get location for table: ' + table_name
 
+  # TODO(todd) make this use Thrift to connect to HS2 instead of shelling
+  # out to beeline for better performance
   def run_stmt_in_hive(self, stmt, username=getuser()):
     """
     Run a statement in Hive, returning stdout if successful and throwing
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index 8e8819a..41b4b01 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -85,19 +85,6 @@ class TestUdfPersistence(CustomClusterTestSuite):
        % self.HIVE_IMPALA_INTEGRATION_DB)
     shutil.rmtree(self.LOCAL_LIBRARY_DIR, ignore_errors=True)
 
-  def run_stmt_in_hive(self, stmt):
-    """
-    Run a statement in Hive, returning stdout if successful and throwing
-    RuntimeError(stderr) if not.
-    """
-    call = subprocess.Popen(
-        ['hive', '-e', stmt], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-    (stdout, stderr) = call.communicate()
-    call.wait()
-    if call.returncode != 0:
-      raise RuntimeError(stderr)
-    return stdout
-
   def __load_drop_functions(self, template, database, location):
     queries = template.format(database=database, location=location)
     # Split queries and remove empty lines
@@ -160,6 +147,13 @@ class TestUdfPersistence(CustomClusterTestSuite):
         db=self.HIVE_IMPALA_INTEGRATION_DB))
     assert stdout is not None and result in str(stdout.data)
 
+  def __describe_udf_in_hive(self, udf, db=HIVE_IMPALA_INTEGRATION_DB):
+    """ Describe the specified function, returning stdout. """
+    # Hive 2+ caches UDFs, so we have to explicitly invalidate the UDF if
+    # we've made changes on the Impala side.
+    stmt = "RELOAD FUNCTION ; DESCRIBE FUNCTION {0}.{1}".format(db, udf)
+    return self.run_stmt_in_hive(stmt)
+
   @SkipIfIsilon.hive
   @SkipIfS3.hive
   @SkipIfABFS.hive
@@ -203,21 +197,22 @@ class TestUdfPersistence(CustomClusterTestSuite):
     # Hive has bug that doesn't display the permanent function in show functions
     # statement. So this test relies on describe function statement which prints
     # a message if the function is not present.
-    for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
+    udfs_to_test = list(self.SAMPLE_JAVA_UDFS)
+    if int(os.environ['IMPALA_HIVE_MAJOR_VERSION']) == 2:
+      udfs_to_test += self.SAMPLE_JAVA_UDFS_HIVE2_ONLY
+    for (fn, fn_symbol) in udfs_to_test:
       self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
           db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
       self.client.execute(self.CREATE_JAVA_UDF_TEMPLATE.format(
           db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
           location=self.HIVE_UDF_JAR, symbol=fn_symbol))
-      hive_stdout = self.run_stmt_in_hive("DESCRIBE FUNCTION %s.%s"
-        % (self.HIVE_IMPALA_INTEGRATION_DB, fn))
+      hive_stdout = self.__describe_udf_in_hive(fn)
       assert "does not exist" not in hive_stdout
       self.__verify_udf_in_hive(fn)
       # Drop the function from Impala and check if it reflects in Hive.
       self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
           db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
-      hive_stdout = self.run_stmt_in_hive("DESCRIBE FUNCTION %s.%s"
-        % (self.HIVE_IMPALA_INTEGRATION_DB, fn))
+      hive_stdout = self.__describe_udf_in_hive(fn)
       assert "does not exist" in hive_stdout
 
     # Create the same set of functions from Hive and make sure they are visible
@@ -226,12 +221,12 @@ class TestUdfPersistence(CustomClusterTestSuite):
     REFRESH_COMMANDS = ["INVALIDATE METADATA",
         "REFRESH FUNCTIONS {0}".format(self.HIVE_IMPALA_INTEGRATION_DB)]
     for refresh_command in REFRESH_COMMANDS:
-      for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
+      for (fn, fn_symbol) in udfs_to_test:
         self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
             db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
             location=self.HIVE_UDF_JAR, symbol=fn_symbol))
       self.client.execute(refresh_command)
-      for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
+      for (fn, fn_symbol) in udfs_to_test:
         result = self.client.execute("SHOW FUNCTIONS IN {0}".format(
             self.HIVE_IMPALA_INTEGRATION_DB))
         assert result is not None and len(result.data) > 0 and\
@@ -456,15 +451,13 @@ class TestUdfPersistence(CustomClusterTestSuite):
     assert "No compatible function signatures" in str(result)
     self.verify_function_count(
         "SHOW FUNCTIONS IN %s like 'badudf*'" % self.JAVA_FN_TEST_DB, 0)
-    result = self.run_stmt_in_hive("DESCRIBE FUNCTION %s.%s"
-        % (self.JAVA_FN_TEST_DB, "badudf"))
+    result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
     assert "does not exist" in str(result)
     # Create the same function from hive and make sure Impala doesn't load any signatures.
     self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
         db=self.JAVA_FN_TEST_DB, function="badudf",
         location=self.JAVA_UDF_JAR, symbol="org.apache.impala.IncompatibleUdfTest"))
-    result = self.run_stmt_in_hive("DESCRIBE FUNCTION %s.%s"
-        % (self.JAVA_FN_TEST_DB, "badudf"))
+    result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
     assert "does not exist" not in str(result)
     self.client.execute("INVALIDATE METADATA")
     self.verify_function_count(
@@ -477,8 +470,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
     # Drop the function and make sure the function if dropped from hive
     self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
         db=self.JAVA_FN_TEST_DB, function="badudf"))
-    result = self.run_stmt_in_hive("DESCRIBE FUNCTION %s.%s"
-        % (self.JAVA_FN_TEST_DB, "badudf"))
+    result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
     assert "does not exist" in str(result)
 
   # Create sample UDA functions in {database} from library {location}
@@ -505,19 +497,18 @@ class TestUdfPersistence(CustomClusterTestSuite):
       ('udfbin', 'org.apache.hadoop.hive.ql.udf.UDFBin'),
       ('udfhex', 'org.apache.hadoop.hive.ql.udf.UDFHex'),
       ('udfconv', 'org.apache.hadoop.hive.ql.udf.UDFConv'),
-      # TODO UDFHour was moved from UDF to GenericUDF in Hive 3
-      # This test will fail when running against HMS-3 unless we add
-      # support for GenericUDFs to handle such cases
-      ('udfhour', 'org.apache.hadoop.hive.ql.udf.UDFHour'),
       ('udflike', 'org.apache.hadoop.hive.ql.udf.UDFLike'),
       ('udfsign', 'org.apache.hadoop.hive.ql.udf.UDFSign'),
-      # TODO UDFYear moved to GenericUDF in Hive 3
-      # This test will fail when running against HMS-3 unless we add
-      # support for GenericUDFs
-      ('udfyear', 'org.apache.hadoop.hive.ql.udf.UDFYear'),
       ('udfascii','org.apache.hadoop.hive.ql.udf.UDFAscii')
   ]
 
+  # These UDFs are available in Hive 2 but in Hive 3 are now implemented
+  # using a new GenericUDF interface that we don't support.
+  SAMPLE_JAVA_UDFS_HIVE2_ONLY = [
+      ('udfhour', 'org.apache.hadoop.hive.ql.udf.UDFHour'),
+      ('udfyear', 'org.apache.hadoop.hive.ql.udf.UDFYear'),
+  ]
+
   # Simple tests to verify java udfs in SAMPLE_JAVA_UDFS
   SAMPLE_JAVA_UDFS_TEST = {
     'udfpi' : ('{db}.udfpi()', '3.141592653589793'),