You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/10/18 10:15:39 UTC

[drill] branch master updated: DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 02ce64eb3f DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)
02ce64eb3f is described below

commit 02ce64eb3f1d19dd7d4968383f1700c5628a4d15
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Tue Oct 18 12:15:31 2022 +0200

    DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)
---
 .editorconfig                                      |  27 ++-
 .../drill/common/exceptions/UserException.java     |  46 +++--
 .../native/client/src/protobuf/UserBitShared.pb.cc | 199 +++++++++----------
 .../native/client/src/protobuf/UserBitShared.pb.h  |   7 +-
 .../drill/exec/store/druid/DruidStoragePlugin.java |  14 +-
 .../drill/exec/store/hbase/HBaseStoragePlugin.java |  16 +-
 .../drill/exec/store/hive/HiveStoragePlugin.java   |  61 +++---
 .../exec/store/jdbc/CapitalizingJdbcSchema.java    |  11 +-
 .../drill/exec/store/jdbc/DefaultJdbcDialect.java  |   2 +-
 .../drill/exec/store/jdbc/DrillJdbcConvention.java |   5 +-
 .../exec/store/jdbc/JdbcConventionFactory.java     |  10 +-
 .../drill/exec/store/jdbc/JdbcDialectFactory.java  |  37 ++--
 .../store/jdbc/JdbcInsertWriterBatchCreator.java   |   2 +-
 .../exec/store/jdbc/JdbcIntermediatePrel.java      |  11 +-
 .../jdbc/JdbcIntermediatePrelConverterRule.java    |   9 +-
 .../org/apache/drill/exec/store/jdbc/JdbcPrel.java |   9 +-
 .../drill/exec/store/jdbc/JdbcRecordWriter.java    |   8 +-
 .../drill/exec/store/jdbc/JdbcStoragePlugin.java   |  32 ++-
 .../exec/store/jdbc/JdbcTableModifyWriter.java     |   6 +-
 .../exec/store/jdbc/JdbcWriterBatchCreator.java    |  10 +-
 .../jdbc/clickhouse/ClickhouseJdbcDialect.java     |   2 +-
 .../drill/exec/store/kafka/KafkaStoragePlugin.java |  13 +-
 .../exec/store/phoenix/PhoenixStoragePlugin.java   |  14 +-
 .../org/apache/calcite/jdbc/DynamicRootSchema.java |  84 ++++++--
 .../java/org/apache/drill/exec/ExecConstants.java  |  31 +++
 .../apache/drill/exec/planner/PlannerPhase.java    |  10 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java     |  38 ++--
 .../exec/server/options/SystemOptionManager.java   |   5 +-
 .../drill/exec/store/AbstractStoragePlugin.java    |  57 +-----
 .../org/apache/drill/exec/store/StoragePlugin.java |   7 +-
 .../store/base/filter/FilterPushDownStrategy.java  |  12 +-
 .../store/ischema/InfoSchemaStoragePlugin.java     |  18 +-
 .../org/apache/drill/exec/util/FileSystemUtil.java |  11 +-
 .../java-exec/src/main/resources/drill-module.conf |   4 +-
 .../drill/exec/physical/impl/TestSchema.java       |  65 ++++++-
 .../apache/drill/exec/util/FileSystemUtilTest.java |   6 +-
 .../org/apache/drill/exec/proto/UserBitShared.java | 216 ++++++++++++---------
 protocol/src/main/protobuf/UserBitShared.proto     |   5 +
 38 files changed, 689 insertions(+), 431 deletions(-)

diff --git a/.editorconfig b/.editorconfig
index 856000f798..22763960b9 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -24,10 +24,36 @@ root = true
 # Default, notably applicable to Java, XML, JavaScript and Shell
 [*]
 end_of_line = lf
+trim_trailing_whitespace = true
 insert_final_newline = true
 charset = utf-8
 indent_style = space
 indent_size = 2
+max_line_length = 100
+
+[*.java]
+ij_any_spaces_around_additive_operators = true
+ij_any_spaces_around_assignment_operators = true
+ij_any_spaces_around_bitwise_operators = true
+ij_any_spaces_around_equality_operators = true
+ij_any_spaces_around_lambda_arrow = true
+ij_any_spaces_around_logical_operators = true
+ij_any_spaces_around_multiplicative_operators = true
+ij_any_spaces_around_relational_operators = true
+ij_any_spaces_around_shift_operators = true
+ij_continuation_indent_size = 2
+ij_java_if_brace_force = always
+ij_java_indent_case_from_switch = false
+ij_java_space_after_colon = true
+ij_java_space_before_colon = true
+ij_java_ternary_operation_signs_on_next_line = true
+ij_java_use_single_class_imports = true
+ij_java_wrap_long_lines = true
+ij_java_align_multiline_parameters = false
+
+# Windows scripts
+[{*.bat,*.cmd}]
+end_of_line = crlf
 
 # C and C++
 [*.{c, cpp}]
@@ -42,4 +68,3 @@ indent_size = ignore
 [**.min.js]
 indent_style = ignore
 indent_size = ignore
-
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 03eaa596f6..437dacf267 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -87,7 +87,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#SYSTEM
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -114,7 +114,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#CONNECTION
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -140,7 +140,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_READ
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -166,7 +166,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_WRITE
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -192,7 +192,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#FUNCTION
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -218,7 +218,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PARSE
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -244,7 +244,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#VALIDATION
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -270,7 +270,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PERMISSION
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -296,7 +296,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLAN
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -304,6 +304,24 @@ public class UserException extends DrillRuntimeException {
     return new Builder(DrillPBError.ErrorType.PLAN, cause);
   }
 
+  /**
+   * Wraps the passed exception inside a plugin error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by
+   * {@link Builder#build(Logger)}
+   * instead of creating a new exception. Any added context will be added to the user exception
+   * as well.
+   *
+   * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLUGIN
+   *
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user
+   * exception it will be returned by the builder instead of creating a new user exception
+   * @return user exception builder
+   */
+  public static Builder pluginError(final Throwable cause) {
+    return new Builder(DrillPBError.ErrorType.PLUGIN, cause);
+  }
+
   /**
    * Creates a new user exception builder .
    *
@@ -322,7 +340,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#RESOURCE
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -348,7 +366,7 @@ public class UserException extends DrillRuntimeException {
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#UNSUPPORTED_OPERATION
    *
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -376,7 +394,7 @@ public class UserException extends DrillRuntimeException {
    * Wraps an error that arises from execution due to issues in the query, in
    * the environment and so on -- anything other than "this should never occur"
    * type checks.
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -389,7 +407,7 @@ public class UserException extends DrillRuntimeException {
    * Indicates an internal validation failed or similar unexpected error. Indicates
    * the problem is likely within Drill itself rather than due to the environment,
    * query, etc.
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
@@ -408,7 +426,7 @@ public class UserException extends DrillRuntimeException {
    * error types. In practice, using this exception indicates that error handling
    * should be moved closer to the source of the exception so we can provide the
    * user with a better explanation than "something went wrong."
-   * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+   * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    */
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index e5a39d62fb..273b3fc328 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -770,113 +770,114 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
   "s.proto\032\022Coordination.proto\032\017SchemaDef.p"
   "roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001"
   "(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 "
-  "\001(\020\"\355\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022("
+  "\001(\020\"\371\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022("
   "\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint"
   "\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill"
   "PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex"
   "ception\030\005 \001(\0132\035.exec.shared.ExceptionWra"
   "pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share"
-  "d.ParsingError\"\362\001\n\tErrorType\022\016\n\nCONNECTI"
+  "d.ParsingError\"\376\001\n\tErrorType\022\016\n\nCONNECTI"
   "ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F"
   "UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004"
   "PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU"
   "PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\022\023\n\017E"
   "XECUTION_ERROR\020\013\022\022\n\016INTERNAL_ERROR\020\014\022\025\n\021"
-  "UNSPECIFIED_ERROR\020\r\"\246\001\n\020ExceptionWrapper"
-  "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001"
-  "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St"
-  "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035."
-  "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr"
-  "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n"
-  "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n"
-  "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005"
-  " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001"
-  "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001("
-  "\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016RecordBatchDef\022\024\n"
-  "\014record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exe"
-  "c.shared.SerializedField\022)\n!carries_two_"
-  "byte_selection_vector\030\003 \001(\010\022\033\n\023affected_"
-  "rows_count\030\004 \001(\005\"\205\001\n\010NamePart\022(\n\004type\030\001 "
-  "\001(\0162\032.exec.shared.NamePart.Type\022\014\n\004name\030"
-  "\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.shared.NameP"
-  "art\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY\020\001\"\324\001\n\017Ser"
-  "ializedField\022%\n\nmajor_type\030\001 \001(\0132\021.commo"
-  "n.MajorType\022(\n\tname_part\030\002 \001(\0132\025.exec.sh"
-  "ared.NamePart\022+\n\005child\030\003 \003(\0132\034.exec.shar"
-  "ed.SerializedField\022\023\n\013value_count\030\004 \001(\005\022"
-  "\027\n\017var_byte_length\030\005 \001(\005\022\025\n\rbuffer_lengt"
-  "h\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_id\030\001 \001(\005\022\030"
-  "\n\020memory_footprint\030\002 \001(\003\"\263\002\n\013QueryResult"
-  "\0228\n\013query_state\030\001 \001(\0162#.exec.shared.Quer"
-  "yResult.QueryState\022&\n\010query_id\030\002 \001(\0132\024.e"
-  "xec.shared.QueryId\022(\n\005error\030\003 \003(\0132\031.exec"
-  ".shared.DrillPBError\"\227\001\n\nQueryState\022\014\n\010S"
-  "TARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n"
-  "\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CANCELLATION_"
-  "REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007"
-  "\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData\022&\n\010query_id"
-  "\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n\trow_coun"
-  "t\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.Recor"
-  "dBatchDef\022\033\n\023affected_rows_count\030\004 \001(\005\"\330"
-  "\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001"
-  "(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.QueryRes"
-  "ult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007forem"
-  "an\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014opti"
-  "ons_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqu"
-  "eue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryProfile\022 \n\002id"
-  "\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001"
-  "(\0162\026.exec.shared.QueryType\022\r\n\005start\030\003 \001("
-  "\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 "
-  "\001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndp"
-  "oint\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryR"
-  "esult.QueryState\022\027\n\017total_fragments\030\t \001("
-  "\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020fragmen"
-  "t_profile\030\013 \003(\0132!.exec.shared.MajorFragm"
-  "entProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001"
-  "(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001"
-  "(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021"
-  " \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 "
-  "\001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 "
-  "\001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\tautoLimit\030\027 \001"
-  "(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"t\n\024MajorFrag"
-  "mentProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022A"
-  "\n\026minor_fragment_profile\030\002 \003(\0132!.exec.sh"
-  "ared.MinorFragmentProfile\"\350\002\n\024MinorFragm"
-  "entProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared."
-  "FragmentState\022(\n\005error\030\002 \001(\0132\031.exec.shar"
-  "ed.DrillPBError\022\031\n\021minor_fragment_id\030\003 \001"
-  "(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.shar"
-  "ed.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022\020"
-  "\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n"
-  "\017max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\013"
-  "2\026.exec.DrillbitEndpoint\022\023\n\013last_update\030"
-  "\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\237\002\n\017Operato"
-  "rProfile\0221\n\rinput_profile\030\001 \003(\0132\032.exec.s"
-  "hared.StreamProfile\022\023\n\013operator_id\030\003 \001(\005"
-  "\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022\023\n\013setup_nan"
-  "os\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_"
-  "local_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010"
-  " \003(\0132\030.exec.shared.MetricValue\022\022\n\nwait_n"
-  "anos\030\t \001(\003\022\032\n\022operator_type_name\030\n \001(\t\"B"
-  "\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batc"
-  "hes\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValu"
-  "e\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003"
-  "\022\024\n\014double_value\030\003 \001(\001\")\n\010Registry\022\035\n\003ja"
-  "r\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name"
-  "\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(\t\"W\n\013Sa"
-  "slMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001"
-  "(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared.SaslSta"
-  "tus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BI"
-  "T_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001"
-  "\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION"
-  "\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFragmentS"
-  "tate\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION"
-  "\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELL"
-  "ED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQUEST"
-  "ED\020\006*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\n"
-  "SASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SA"
-  "SL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apa"
-  "che.drill.exec.protoB\rUserBitSharedH\001"
+  "UNSPECIFIED_ERROR\020\r\022\n\n\006PLUGIN\020\016\"\246\001\n\020Exce"
+  "ptionWrapper\022\027\n\017exception_class\030\001 \001(\t\022\017\n"
+  "\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132%.ex"
+  "ec.shared.StackTraceElementWrapper\022,\n\005ca"
+  "use\030\004 \001(\0132\035.exec.shared.ExceptionWrapper"
+  "\"\205\001\n\030StackTraceElementWrapper\022\022\n\nclass_n"
+  "ame\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line_num"
+  "ber\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is_nat"
+  "ive_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014star"
+  "t_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend"
+  "_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016Recor"
+  "dBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005field"
+  "\030\002 \003(\0132\034.exec.shared.SerializedField\022)\n!"
+  "carries_two_byte_selection_vector\030\003 \001(\010\022"
+  "\033\n\023affected_rows_count\030\004 \001(\005\"\205\001\n\010NamePar"
+  "t\022(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.T"
+  "ype\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec."
+  "shared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARR"
+  "AY\020\001\"\324\001\n\017SerializedField\022%\n\nmajor_type\030\001"
+  " \001(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001"
+  "(\0132\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\013"
+  "2\034.exec.shared.SerializedField\022\023\n\013value_"
+  "count\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\025\n\r"
+  "buffer_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007nod"
+  "e_id\030\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\263\002\n"
+  "\013QueryResult\0228\n\013query_state\030\001 \001(\0162#.exec"
+  ".shared.QueryResult.QueryState\022&\n\010query_"
+  "id\030\002 \001(\0132\024.exec.shared.QueryId\022(\n\005error\030"
+  "\003 \003(\0132\031.exec.shared.DrillPBError\"\227\001\n\nQue"
+  "ryState\022\014\n\010STARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCO"
+  "MPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026C"
+  "ANCELLATION_REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n"
+  "\tPREPARING\020\007\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData"
+  "\022&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId"
+  "\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec."
+  "shared.RecordBatchDef\022\033\n\023affected_rows_c"
+  "ount\030\004 \001(\005\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022"
+  "\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.sha"
+  "red.QueryResult.QueryState\022\017\n\004user\030\004 \001(\t"
+  ":\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.DrillbitEndp"
+  "oint\022\024\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost"
+  "\030\007 \001(\001\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryP"
+  "rofile\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId"
+  "\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryType\022\r"
+  "\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001("
+  "\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec."
+  "DrillbitEndpoint\0222\n\005state\030\010 \001(\0162#.exec.s"
+  "hared.QueryResult.QueryState\022\027\n\017total_fr"
+  "agments\030\t \001(\005\022\032\n\022finished_fragments\030\n \001("
+  "\005\022;\n\020fragment_profile\030\013 \003(\0132!.exec.share"
+  "d.MajorFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022"
+  "\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010"
+  "error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014op"
+  "tions_json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014que"
+  "ueWaitEnd\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nq"
+  "ueue_name\030\025 \001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\ta"
+  "utoLimit\030\027 \001(\005\022\027\n\017scanned_plugins\030\030 \003(\t\""
+  "t\n\024MajorFragmentProfile\022\031\n\021major_fragmen"
+  "t_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030\002 \003"
+  "(\0132!.exec.shared.MinorFragmentProfile\"\350\002"
+  "\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\0162\032."
+  "exec.shared.FragmentState\022(\n\005error\030\002 \001(\013"
+  "2\031.exec.shared.DrillPBError\022\031\n\021minor_fra"
+  "gment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(\013"
+  "2\034.exec.shared.OperatorProfile\022\022\n\nstart_"
+  "time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_u"
+  "sed\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010en"
+  "dpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n\013"
+  "last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003"
+  "\"\237\002\n\017OperatorProfile\0221\n\rinput_profile\030\001 "
+  "\003(\0132\032.exec.shared.StreamProfile\022\023\n\013opera"
+  "tor_id\030\003 \001(\005\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022"
+  "\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 "
+  "\001(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003"
+  "\022(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricVal"
+  "ue\022\022\n\nwait_nanos\030\t \001(\003\022\032\n\022operator_type_"
+  "name\030\n \001(\t\"B\n\rStreamProfile\022\017\n\007records\030\001"
+  " \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J"
+  "\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong"
+  "_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Re"
+  "gistry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n"
+  "\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signature"
+  "\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t"
+  "\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.sh"
+  "ared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CON"
+  "TROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryT"
+  "ype\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022"
+  "\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207"
+  "\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITIN"
+  "G_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020"
+  "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLA"
+  "TION_REQUESTED\020\006*g\n\nSaslStatus\022\020\n\014SASL_U"
+  "NKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROG"
+  "RESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020"
+  "\004B.\n\033org.apache.drill.exec.protoB\rUserBi"
+  "tSharedH\001"
   ;
 static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_UserBitShared_2eproto_deps[3] = {
   &::descriptor_table_Coordination_2eproto,
@@ -885,7 +886,7 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor
 };
 static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_UserBitShared_2eproto_once;
 const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_UserBitShared_2eproto = {
-  false, false, 4437, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 
+  false, false, 4449, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 
   &descriptor_table_UserBitShared_2eproto_once, descriptor_table_UserBitShared_2eproto_deps, 3, 22,
   schemas, file_default_instances, TableStruct_UserBitShared_2eproto::offsets,
   file_level_metadata_UserBitShared_2eproto, file_level_enum_descriptors_UserBitShared_2eproto, file_level_service_descriptors_UserBitShared_2eproto,
@@ -918,6 +919,7 @@ bool DrillPBError_ErrorType_IsValid(int value) {
     case 11:
     case 12:
     case 13:
+    case 14:
       return true;
     default:
       return false;
@@ -939,6 +941,7 @@ constexpr DrillPBError_ErrorType DrillPBError::VALIDATION;
 constexpr DrillPBError_ErrorType DrillPBError::EXECUTION_ERROR;
 constexpr DrillPBError_ErrorType DrillPBError::INTERNAL_ERROR;
 constexpr DrillPBError_ErrorType DrillPBError::UNSPECIFIED_ERROR;
+constexpr DrillPBError_ErrorType DrillPBError::PLUGIN;
 constexpr DrillPBError_ErrorType DrillPBError::ErrorType_MIN;
 constexpr DrillPBError_ErrorType DrillPBError::ErrorType_MAX;
 constexpr int DrillPBError::ErrorType_ARRAYSIZE;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index a7ad1ef0d8..ffbc7cffa8 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -168,11 +168,12 @@ enum DrillPBError_ErrorType : int {
   DrillPBError_ErrorType_VALIDATION = 10,
   DrillPBError_ErrorType_EXECUTION_ERROR = 11,
   DrillPBError_ErrorType_INTERNAL_ERROR = 12,
-  DrillPBError_ErrorType_UNSPECIFIED_ERROR = 13
+  DrillPBError_ErrorType_UNSPECIFIED_ERROR = 13,
+  DrillPBError_ErrorType_PLUGIN = 14
 };
 bool DrillPBError_ErrorType_IsValid(int value);
 constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MIN = DrillPBError_ErrorType_CONNECTION;
-constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MAX = DrillPBError_ErrorType_UNSPECIFIED_ERROR;
+constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MAX = DrillPBError_ErrorType_PLUGIN;
 constexpr int DrillPBError_ErrorType_ErrorType_ARRAYSIZE = DrillPBError_ErrorType_ErrorType_MAX + 1;
 
 const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* DrillPBError_ErrorType_descriptor();
@@ -794,6 +795,8 @@ class DrillPBError PROTOBUF_FINAL :
     DrillPBError_ErrorType_INTERNAL_ERROR;
   static constexpr ErrorType UNSPECIFIED_ERROR =
     DrillPBError_ErrorType_UNSPECIFIED_ERROR;
+  static constexpr ErrorType PLUGIN =
+    DrillPBError_ErrorType_PLUGIN;
   static inline bool ErrorType_IsValid(int value) {
     return DrillPBError_ErrorType_IsValid(value);
   }
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
index 2e74ad14ba..45ee0a2b32 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -63,9 +64,16 @@ public class DruidStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
-    OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext,
+    PlannerPhase phase
+  ) {
+    switch (phase) {
+      case PHYSICAL:
+        return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+      default:
+        return ImmutableSet.of();
+    }
   }
 
   @Override
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 10532d657b..64797a3546 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -75,8 +76,19 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of(HBasePushFilterIntoScan.FILTER_ON_SCAN, HBasePushFilterIntoScan.FILTER_ON_PROJECT);
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext,
+    PlannerPhase phase
+  ) {
+    switch (phase) {
+      case PHYSICAL:
+        return ImmutableSet.of(
+          HBasePushFilterIntoScan.FILTER_ON_SCAN,
+          HBasePushFilterIntoScan.FILTER_ON_PROJECT
+        );
+      default:
+        return ImmutableSet.of();
+    }
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 5f7061ed90..c021ebca14 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -42,6 +42,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -189,38 +190,38 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getLogicalOptimizerRules(OptimizerRulesContext optimizerContext) {
-    final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
-    ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
-
-    ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerContext, defaultPartitionValue));
-    ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerContext, defaultPartitionValue));
-
-    return ruleBuilder.build();
-  }
-
-  @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
-    OptionManager options = optimizerRulesContext.getPlannerSettings().getOptions();
-    // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
-    // once "store.parquet.reader.int96_as_timestamp" will be true by default
-    if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS) ||
-        options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
-      ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
-    }
-    if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)) {
-      try {
-        Class<?> hiveToDrillMapRDBJsonRuleClass =
-            Class.forName("org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan");
-        ruleBuilder.add((StoragePluginOptimizerRule) hiveToDrillMapRDBJsonRuleClass.getField("INSTANCE").get(null));
-      } catch (ReflectiveOperationException e) {
-        logger.warn("Current Drill build is not designed for working with Hive MapR-DB tables. " +
-            "Please disable {} option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case LOGICAL:
+        final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+        ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
+        ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerContext, defaultPartitionValue));
+        ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerContext, defaultPartitionValue));
+        return ruleBuilder.build();
+      case PHYSICAL: {
+        ruleBuilder = ImmutableSet.builder();
+        OptionManager options = optimizerContext.getPlannerSettings().getOptions();
+        // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
+        // once "store.parquet.reader.int96_as_timestamp" will be true by default
+        if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS) ||
+            options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
+          ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
+        }
+        if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)) {
+          try {
+            Class<?> hiveToDrillMapRDBJsonRuleClass =
+                Class.forName("org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan");
+            ruleBuilder.add((StoragePluginOptimizerRule) hiveToDrillMapRDBJsonRuleClass.getField("INSTANCE").get(null));
+          } catch (ReflectiveOperationException e) {
+            logger.warn("Current Drill build is not designed for working with Hive MapR-DB tables. " +
+                "Please disable {} option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
+          }
+        }
+        return ruleBuilder.build();
       }
+      default:
+        return ImmutableSet.of();
     }
-    return ruleBuilder.build();
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 713653e743..c9e3d83307 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -33,6 +33,7 @@ import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
@@ -152,10 +153,12 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
     }
 
     List<String> names = getFullTablePath(tableName);
-    SqlDialect dialect = plugin.getDialect(inner.getDataSource());
-    String dropTableQuery = SqlDropTable.OPERATOR.createCall(
-        SqlParserPos.ZERO, new SqlIdentifier(names, SqlParserPos.ZERO), SqlLiteral.createBoolean(false, SqlParserPos.ZERO))
-      .toSqlString(dialect).getSql();
+    SqlCall dropCall = SqlDropTable.OPERATOR.createCall(
+      SqlParserPos.ZERO,
+      new SqlIdentifier(names, SqlParserPos.ZERO),
+      SqlLiteral.createBoolean(false, SqlParserPos.ZERO)
+    );
+    String dropTableQuery = dropCall.toSqlString(inner.dialect).getSql();
 
     try (Connection conn = inner.getDataSource().getConnection();
          Statement stmt = conn.createStatement()) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
index c8ef31eb77..bb4be514da 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
@@ -47,7 +47,7 @@ public class DefaultJdbcDialect implements JdbcDialect {
       return;
     }
 
-    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials());
 
     JdbcCatalogSchema schema = new JdbcCatalogSchema(
       plugin.getName(),
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 09ccb57238..ecf946a380 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlDialect;
 import org.apache.drill.exec.planner.RuleInstance;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.enumerable.plan.DrillJdbcRuleBase;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
 import org.apache.drill.exec.store.jdbc.rules.JdbcLimitRule;
@@ -59,7 +60,7 @@ public class DrillJdbcConvention extends JdbcConvention {
   private final ImmutableSet<RelOptRule> rules;
   private final JdbcStoragePlugin plugin;
 
-  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
+  DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, UserCredentials userCredentials) {
     super(dialect, ConstantUntypedNull.INSTANCE, name);
     this.plugin = plugin;
 
@@ -68,7 +69,7 @@ public class DrillJdbcConvention extends JdbcConvention {
       DrillRel.DRILL_LOGICAL);
 
     ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
-      .add(new JdbcIntermediatePrelConverterRule(this, username))
+      .add(new JdbcIntermediatePrelConverterRule(this, userCredentials))
       .add(VertexDrelConverterRule.create(this))
       .add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
       .add(RuleInstance.PROJECT_REMOVE_RULE);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
index 3a774e0e36..af20823c9e 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
@@ -18,7 +18,9 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 
@@ -31,7 +33,7 @@ public class JdbcConventionFactory {
   public static final int CACHE_SIZE = 100;
   public static final Duration CACHE_TTL = Duration.ofHours(1);
 
-  private final Cache<SqlDialect, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
+  private final Cache<Pair<SqlDialect, UserCredentials>, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
       .maximumSize(CACHE_SIZE)
       .expireAfterAccess(CACHE_TTL)
       .build();
@@ -39,12 +41,12 @@ public class JdbcConventionFactory {
   public DrillJdbcConvention getJdbcConvention(
       JdbcStoragePlugin plugin,
       SqlDialect dialect,
-      String username) {
+      UserCredentials userCredentials) {
     try {
-      return cache.get(dialect, new Callable<DrillJdbcConvention>() {
+      return cache.get(Pair.of(dialect, userCredentials), new Callable<DrillJdbcConvention>() {
         @Override
         public DrillJdbcConvention call() {
-          return new DrillJdbcConvention(dialect, plugin.getName(), plugin, username);
+          return new DrillJdbcConvention(dialect, plugin.getName(), plugin, userCredentials);
         }
       });
     } catch (ExecutionException ex) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
index dfc2073f4f..b3eaef16ea 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
@@ -18,38 +18,33 @@
 package org.apache.drill.exec.store.jdbc;
 
 import org.apache.calcite.sql.SqlDialect;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
-import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
-import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 
 import java.time.Duration;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
 public class JdbcDialectFactory {
   public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
   public static final int CACHE_SIZE = 100;
   public static final Duration CACHE_TTL = Duration.ofHours(1);
-
-  private final Cache<SqlDialect, JdbcDialect> cache = CacheBuilder.newBuilder()
-      .maximumSize(CACHE_SIZE)
-      .expireAfterAccess(CACHE_TTL)
-      .build();
+  private volatile JdbcDialect jdbcDialect;
 
   public JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
-    try {
-      return cache.get(dialect, new Callable<JdbcDialect>() {
-        @Override
-        public JdbcDialect call() {
-          return plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
-              ? new ClickhouseJdbcDialect(plugin, dialect)
-              : new DefaultJdbcDialect(plugin, dialect);
+    // Note: any given JdbcDialectFactory instance will only ever be called with
+    // a single SqlDialect so we can cache using a single member.
+    JdbcDialect jd = jdbcDialect;
+    if (jd == null) {
+      // Double checked locking using a volatile member and a local var
+      // optimisation to reduce volatile accesses.
+      synchronized (this) {
+        jd = jdbcDialect;
+        if (jd == null) {
+          jd = plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
+                ? new ClickhouseJdbcDialect(plugin, dialect)
+                : new DefaultJdbcDialect(plugin, dialect);
+          jdbcDialect = jd;
         }
-      });
-    } catch (ExecutionException ex) {
-      throw new DrillRuntimeException("Cannot load the requested JdbcDialect", ex);
+      }
     }
+    return jd;
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
index 1c86e6447d..5dc5d7c2de 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
@@ -49,7 +49,7 @@ public class JdbcInsertWriterBatchCreator implements BatchCreator<JdbcInsertWrit
       config,
       children.iterator().next(),
       context,
-      new JdbcTableModifyWriter(ds, config.getTableIdentifier(), config)
+      new JdbcTableModifyWriter(userCreds, config.getTableIdentifier(), config)
     );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
index f21b986396..7cea799b17 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.SinglePrel;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 /**
@@ -35,11 +36,11 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
  * before execution can happen.
  */
 public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
-  private final String username;
+  private final UserCredentials userCredentials;
 
-  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, String username) {
+  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, UserCredentials userCredentials) {
     super(cluster, traits, child);
-    this.username = username;
+    this.userCredentials = userCredentials;
   }
 
   @Override
@@ -49,7 +50,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), username);
+    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), userCredentials);
   }
 
   @Override
@@ -64,7 +65,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
 
   @Override
   public Prel finalizeRel() {
-    return new JdbcPrel(getCluster(), getTraitSet(), this, username);
+    return new JdbcPrel(getCluster(), getTraitSet(), this, userCredentials);
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index 62250b4662..57b986667b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -27,15 +27,16 @@ import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
 class JdbcIntermediatePrelConverterRule extends RelOptRule {
 
   private final RelTrait inTrait;
   private final RelTrait outTrait;
-  private final String username;
+  private final UserCredentials userCredentials;
 
-  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, String username) {
+  public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, UserCredentials userCredentials) {
     super(
         RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
             RelOptHelper.any(RelNode.class, jdbcConvention)),
@@ -43,7 +44,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
 
     this.inTrait = DrillRel.DRILL_LOGICAL;
     this.outTrait = Prel.DRILL_PHYSICAL;
-    this.username = username;
+    this.userCredentials = userCredentials;
   }
 
   @Override
@@ -52,7 +53,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
         in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
-        in.getInput(0), username);
+        in.getInput(0), userCredentials);
     call.transformTo(jdbcIntermediatePrel);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index b126d948a3..00753c8971 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import java.util.ArrayList;
@@ -43,12 +44,12 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
   private final String sql;
   private final double rows;
   private final DrillJdbcConvention convention;
-  private final String username;
+  private final UserCredentials userCredentials;
 
-  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, String username) {
+  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, UserCredentials userCredentials) {
     super(cluster, traitSet);
     final RelNode input = prel.getInput();
-    this.username = username;
+    this.userCredentials = userCredentials;
     rows = input.estimateRowCount(cluster.getMetadataQuery());
     convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
     JdbcDialect jdbcDialect = convention.getPlugin().getJdbcDialect(convention.dialect);
@@ -74,7 +75,7 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
     for (String col : rowType.getFieldNames()) {
       columns.add(SchemaPath.getSimplePath(col));
     }
-    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, username);
+    JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, userCredentials.getUserName());
     return creator.addMetadata(this, output);
   }
 
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index ebe907392a..7f767cf21b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -39,7 +40,6 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.sql.DataSource;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -60,15 +60,15 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
   private final JdbcWriter config;
   private int recordCount;
 
-  public JdbcRecordWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+  public JdbcRecordWriter(UserCredentials userCredentials, List<String> tableIdentifier, JdbcWriter config) {
     this.tableIdentifier = tableIdentifier;
-    this.dialect = config.getPlugin().getDialect(source);
+    this.dialect = config.getPlugin().getDialect(userCredentials);
     this.config = config;
     this.recordCount = 0;
     this.insertStatementBuilder = getInsertStatementBuilder(tableIdentifier);
 
     try {
-      this.connection = source.getConnection();
+      this.connection = config.getPlugin().getDataSource(userCredentials).get().getConnection();
     } catch (SQLException e) {
       throw UserException.connectionError()
         .message("Unable to open JDBC connection for writing.")
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 8489087465..c968d518ec 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -56,6 +56,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
   private final JdbcStorageConfig jdbcStorageConfig;
   private final JdbcDialectFactory dialectFactory;
   private final JdbcConventionFactory conventionFactory;
+  private volatile SqlDialect sqlDialect;
   // DataSources for this storage config keyed on JDBC username
   private final Map<String, HikariDataSource> dataSources = new ConcurrentHashMap<>();
 
@@ -79,7 +80,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
       return;
     }
 
-    SqlDialect dialect = getDialect(dataSource.get());
+    SqlDialect dialect = getDialect(userCreds);
     getJdbcDialect(dialect).registerSchemas(config, parent);
   }
 
@@ -108,19 +109,31 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
     ));
   }
 
-  public SqlDialect getDialect(DataSource dataSource) {
-    return JdbcSchema.createDialect(
-      SqlDialectFactoryImpl.INSTANCE,
-      dataSource
-    );
+  public SqlDialect getDialect(UserCredentials userCredentials) {
+    SqlDialect sd = sqlDialect;
+    if (sd == null) {
+      // Double checked locking using a volatile member and a local var
+      // optimisation to reduce volatile accesses.
+      synchronized (this) {
+        sd = sqlDialect;
+        if (sd == null) {
+          sd = JdbcSchema.createDialect(
+            SqlDialectFactoryImpl.INSTANCE,
+            getDataSource(userCredentials).get()
+          );
+          sqlDialect = sd;
+        }
+      }
+    }
+    return sd;
   }
 
   public JdbcDialect getJdbcDialect(SqlDialect dialect) {
     return dialectFactory.getJdbcDialect(this, dialect);
   }
 
-  public DrillJdbcConvention getConvention(SqlDialect dialect, String username) {
-    return conventionFactory.getJdbcConvention(this, dialect, username);
+  public DrillJdbcConvention getConvention(SqlDialect dialect, UserCredentials userCredentials) {
+    return conventionFactory.getJdbcConvention(this, dialect, userCredentials);
   }
 
   @Override
@@ -151,9 +164,8 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
       case PHYSICAL: {
         UserCredentials userCreds = optimizerContext.getContextInformation().getQueryUserCredentials();
 
-        String userName = userCreds.getUserName();
         return getDataSource(userCreds)
-          .map(dataSource -> getConvention(getDialect(dataSource), userName).getRules())
+          .map(dataSource -> getConvention(getDialect(userCreds), userCreds).getRules())
           .orElse(ImmutableSet.of());
       }
       case LOGICAL_PRUNE_AND_JOIN:
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
index 3895a087cd..4cd84b2499 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.exec.store.jdbc;
 
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.record.VectorAccessible;
 
-import javax.sql.DataSource;
 import java.util.List;
 
 public class JdbcTableModifyWriter extends JdbcRecordWriter {
 
-  public JdbcTableModifyWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
-    super(source, tableIdentifier, config);
+  public JdbcTableModifyWriter(UserCredentials userCredentials, List<String> tableIdentifier, JdbcWriter config) {
+    super(userCredentials, tableIdentifier, config);
   }
 
   @Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 4373ba6303..0e00e471fb 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.jdbc;
 
 import java.util.List;
 
-import javax.sql.DataSource;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -38,18 +36,12 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
     assert children != null && children.size() == 1;
 
     UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
-    DataSource ds = config.getPlugin().getDataSource(userCreds)
-      .orElseThrow(() -> new ExecutionSetupException(String.format(
-        "Query user %s could obtain a connection to %s, missing credentials?",
-        userCreds.getUserName(),
-        config.getPlugin().getName()
-      )));
 
     return new WriterRecordBatch(
       config,
       children.iterator().next(),
       context,
-      new JdbcRecordWriter(ds, config.getTableIdentifier(), config)
+      new JdbcRecordWriter(userCreds, config.getTableIdentifier(), config)
     );
   }
 }
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
index 59816f30c1..d1a18a6c26 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
@@ -51,7 +51,7 @@ public class ClickhouseJdbcDialect implements JdbcDialect {
     if (!dataSource.isPresent()) {
       return;
     }
-    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+    DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials());
 
     ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(
       plugin.getName(),
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index d80b959a04..7fb53ce3a3 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -24,6 +24,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -67,8 +68,16 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext,
+    PlannerPhase phase
+  ) {
+    switch (phase) {
+      case PHYSICAL:
+        return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
+      default:
+        return ImmutableSet.of();
+    }
   }
 
   @Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
index 542099565a..6beb6b3711 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -47,6 +48,7 @@ import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
 import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
 import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tephra.shaded.com.google.common.collect.ImmutableSet;
 
 public class PhoenixStoragePlugin extends AbstractStoragePlugin {
 
@@ -88,8 +90,16 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<? extends RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return convention.getRules();
+  public Set<? extends RelOptRule> getOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext,
+    PlannerPhase phase
+  ) {
+    switch (phase) {
+      case PHYSICAL:
+        return convention.getRules();
+      default:
+        return ImmutableSet.of();
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
index 1f8b16262b..380f0d52a2 100644
--- a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -26,6 +26,7 @@ import org.apache.calcite.util.BuiltInMethod;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserExceptionUtils;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.alias.AliasRegistryProvider;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -37,7 +38,6 @@ import org.apache.drill.exec.store.SubSchemaWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -91,22 +91,68 @@ public class DynamicRootSchema extends DynamicSchema {
 
   public SchemaPath resolveTableAlias(String alias) {
     return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
-        .getUserAliases(schemaConfig.getUserName()).get(alias))
+      .getUserAliases(schemaConfig.getUserName()).get(alias))
       .map(SchemaPath::parseFromString)
       .orElse(null);
   }
 
+  private void registerSchemasWithRetry(StoragePlugin plugin) throws Exception {
+    long maxAttempts = 1 + schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS)
+      .num_val;
+    long retryDelayMs = schemaConfig
+      .getOption(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY)
+      .num_val;
+    int attempt=0;
+    Exception lastAttemptEx = null;
+
+    while (attempt++ < maxAttempts) {
+      try {
+        plugin.registerSchemas(schemaConfig, plus());
+        return;
+      } catch (Exception ex) {
+        lastAttemptEx = ex;
+        logger.warn(
+          "Attempt {} of {} to register schemas for plugin {} failed.",
+          attempt, maxAttempts, plugin,
+          ex
+        );
+
+        if (attempt < maxAttempts) {
+          logger.info(
+            "Next attempt to register schemas for plugin {} will be made in {}ms.",
+            plugin,
+            retryDelayMs
+          );
+          try {
+            Thread.sleep(retryDelayMs);
+          } catch (InterruptedException intEx) {
+            logger.warn(
+              "Interrupted while waiting to make another attempt to register " +
+                "schemas for plugin {}.",
+              plugin,
+              intEx
+            );
+          }
+        }
+      }
+    }
+
+    throw lastAttemptEx;
+  }
+
   /**
    * Loads schema factory(storage plugin) for specified {@code schemaName}
    * @param schemaName the name of the schema
    * @param caseSensitive whether matching for the schema name is case sensitive
    */
   private void loadSchemaFactory(String schemaName, boolean caseSensitive) {
+    StoragePlugin plugin = null;
     try {
       SchemaPlus schemaPlus = this.plus();
-      StoragePlugin plugin = storages.getPlugin(schemaName);
+      plugin = storages.getPlugin(schemaName);
       if (plugin != null) {
-        plugin.registerSchemas(schemaConfig, schemaPlus);
+        registerSchemasWithRetry(plugin);
         return;
       }
 
@@ -122,7 +168,7 @@ public class DynamicRootSchema extends DynamicSchema {
         SchemaPlus firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
         if (firstLevelSchema == null) {
           // register schema for this storage plugin to 'this'.
-          plugin.registerSchemas(schemaConfig, schemaPlus);
+          registerSchemasWithRetry(plugin);
           firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
         }
         // Load second level schemas for this storage plugin
@@ -142,15 +188,32 @@ public class DynamicRootSchema extends DynamicSchema {
           schemaPlus.add(wrapper.getName(), wrapper);
         }
       }
-    } catch(PluginException | IOException ex) {
-      logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
+    } catch (Exception ex) {
+      logger.error("Failed to load schema for {}", schemaName, ex);
       // We can't proceed further without a schema, throw a runtime exception.
       UserException.Builder exceptBuilder =
           UserException
-              .resourceError(ex)
-              .message("Failed to load schema for \"" + schemaName + "\"!")
-              .addContext(ex.getClass().getName() + ": " + ex.getMessage())
+              .pluginError(ex)
+              .message("Failed to load schema for schema %s", schemaName)
+              .addContext("%s: %s", ex.getClass().getName(), ex.getMessage())
               .addContext(UserExceptionUtils.getUserHint(ex)); //Provide hint if it exists
+
+      if (schemaConfig.getOption(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE).bool_val) {
+        String msg = String.format(
+          "The plugin %s will now be disabled (see SYSTEM option %s)",
+          plugin.getName(),
+          ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE
+        );
+        exceptBuilder.addContext(msg);
+        logger.warn(msg);
+
+        try {
+          storages.setEnabled(plugin.getName(), false);
+        } catch (PluginException disableEx) {
+          logger.error("Could not disable {}", plugin.getName(), disableEx);
+        }
+      }
+
       throw exceptBuilder.build(logger);
     }
   }
@@ -187,4 +250,3 @@ public class DynamicRootSchema extends DynamicSchema {
     }
   }
 }
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 695f8b6b02..ded472e32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValid
 import org.apache.drill.exec.server.options.TypeValidators.IntegerValidator;
 import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.MaxWidthValidator;
+import org.apache.drill.exec.server.options.TypeValidators.NonNegativeLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
@@ -1094,6 +1095,36 @@ public final class ExecConstants {
       new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " +
         "Default is false. (Drill 1.15+)"));
 
+  public static final String STORAGE_PLUGIN_RETRY_ATTEMPTS = "storage.plugin_retry_attempts";
+  public static final LongValidator STORAGE_PLUGIN_RETRY_ATTEMPTS_VALIDATOR = new NonNegativeLongValidator(
+    STORAGE_PLUGIN_RETRY_ATTEMPTS,
+    10,
+    new OptionDescription(
+      "The maximum number of retries that will be attempted to request metadata " +
+        "for query planning from a storage plugin."
+    )
+  );
+  public static final String STORAGE_PLUGIN_RETRY_DELAY = "storage.plugin_retry_attempt_delay";
+  public static final LongValidator STORAGE_PLUGIN_RETRY_DELAY_VALIDATOR = new NonNegativeLongValidator(
+    STORAGE_PLUGIN_RETRY_DELAY,
+    5 * 1000,
+    new OptionDescription(String.format(
+      "The delay in milliseconds between repeated attempts to request metadata " +
+        "for query planning from a storage plugin (see %s).",
+        STORAGE_PLUGIN_RETRY_ATTEMPTS
+    ))
+  );
+  public static final String STORAGE_PLUGIN_AUTO_DISABLE = "storage.plugin_auto_disable";
+  public static final BooleanValidator STORAGE_PLUGIN_AUTO_DISABLE_VALIDATOR = new BooleanValidator(
+    STORAGE_PLUGIN_AUTO_DISABLE,
+    new OptionDescription(String.format(
+      "Controls whether a storage plugin will automatically be disabled after " +
+        "the configured number of attempts to request metadata for query " +
+        " planning from it have failed (see %s)",
+      STORAGE_PLUGIN_RETRY_ATTEMPTS
+    ))
+  );
+
   public static final String RETURN_RESULT_SET_FOR_DDL = "exec.query.return_result_set_for_ddl";
   public static final BooleanValidator RETURN_RESULT_SET_FOR_DDL_VALIDATOR = new BooleanValidator(RETURN_RESULT_SET_FOR_DDL,
       new OptionDescription("Controls whether to return result set for CREATE TABLE / VIEW / FUNCTION, DROP TABLE / VIEW / FUNCTION, " +
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index a0f8024fea..00ff1fc704 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -93,7 +93,6 @@ import org.apache.drill.exec.planner.physical.UnnestPrule;
 import org.apache.drill.exec.planner.physical.ValuesPrule;
 import org.apache.drill.exec.planner.physical.WindowPrule;
 import org.apache.drill.exec.planner.physical.WriterPrule;
-import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.parquet.FilePushDownFilter;
 
@@ -250,16 +249,11 @@ public enum PlannerPhase {
 
   public abstract RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins);
 
-  @SuppressWarnings("deprecation")
   private static RuleSet getStorageRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins,
       PlannerPhase phase) {
     final Builder<RelOptRule> rules = ImmutableSet.builder();
-    for (StoragePlugin plugin : plugins) {
-      if (plugin instanceof AbstractStoragePlugin) {
-        rules.addAll(((AbstractStoragePlugin) plugin).getOptimizerRules(context, phase));
-      } else {
-        rules.addAll(plugin.getOptimizerRules(context));
-      }
+    for (StoragePlugin sp : plugins) {
+      rules.addAll(sp.getOptimizerRules(context, phase));
     }
     return RuleSets.ofList(rules.build());
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 1d75e6c0aa..96155158c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
 import org.apache.drill.exec.planner.sql.parser.SqlSchema;
 import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
@@ -127,19 +128,34 @@ public class DrillSqlWorker {
     try {
       return getPhysicalPlan(context, sql, textPlan, retryAttempts);
     } catch (Exception e) {
-      logger.trace("There was an error during conversion into physical plan. " +
-          "Will sync remote and local function registries if needed and retry " +
-          "in case if issue was due to missing function implementation.", e);
-      // it is prohibited to retry query planning for ANALYZE statement since it changes
+      logger.trace("There was an error during conversion into physical plan.", e);
+
+      // It is prohibited to retry query planning for ANALYZE statement since it changes
       // query-level option values and will fail when rerunning with updated values
-      if (context.getFunctionRegistry().syncWithRemoteRegistry(
-              context.getDrillOperatorTable().getFunctionRegistryVersion())
-        && context.getSQLStatementType() != SqlStatementType.ANALYZE) {
-        context.reloadDrillOperatorTable();
-        logger.trace("Local function registry was synchronized with remote. Trying to find function one more time.");
-        return getPhysicalPlan(context, sql, textPlanCopy, retryAttempts);
+      boolean syncFuncsAndRetry = context.getSQLStatementType() != SqlStatementType.ANALYZE;
+
+      // If an error has occurred in a plugin then it will not help to look for a missing
+      // function again.
+      syncFuncsAndRetry &= !(e instanceof UserException &&
+        ((UserException) e).getErrorType() == DrillPBError.ErrorType.PLUGIN);
+
+      // Attempt a function registry sync
+      logger.trace("Will sync remote and local function registries if needed.");
+      int funcRegistryVer = context.getDrillOperatorTable().getFunctionRegistryVersion();
+      syncFuncsAndRetry &= context.getFunctionRegistry().syncWithRemoteRegistry(funcRegistryVer);
+
+      if (!syncFuncsAndRetry) {
+        // We don't want to or could not sync the function registry, return to
+        // raising the original error.
+        throw e;
       }
-      throw e;
+
+      context.reloadDrillOperatorTable();
+      logger.trace(
+        "Local function registry was synchronized with remote. " +
+          "Trying to find function one more time."
+      );
+      return getPhysicalPlan(context, sql, textPlanCopy, retryAttempts);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3e1f6a73ec..e3421b4dab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -320,7 +320,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.METASTORE_RETRIEVAL_RETRY_ATTEMPTS_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_READER_ENABLE_MAP_SUPPORT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, false, false)),
       new OptionDefinition(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC_VALIDATOR),
-      new OptionDefinition(ExecConstants.ENABLE_ALIASES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
+      new OptionDefinition(ExecConstants.ENABLE_ALIASES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+      new OptionDefinition(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+      new OptionDefinition(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+      new OptionDefinition(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
     };
 
     CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 2015002f54..4613693011 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -19,18 +19,13 @@ package org.apache.drill.exec.store;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.calcite.plan.RelOptRule;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.metastore.MetadataProviderManager;
-import org.apache.drill.exec.planner.PlannerPhase;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
@@ -64,54 +59,6 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     return false;
   }
 
-  /**
-   * @deprecated Marking for deprecation in next major version release. Use
-   *             {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
-   */
-  @Override
-  @Deprecated
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext) {
-    return ImmutableSet.of();
-  }
-
-  /**
-   * @deprecated Marking for deprecation in next major version release. Use
-   *             {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
-   */
-  @Deprecated
-  public Set<? extends RelOptRule> getLogicalOptimizerRules(OptimizerRulesContext optimizerContext) {
-    return ImmutableSet.of();
-  }
-
-  /**
-   * @deprecated Marking for deprecation in next major version release. Use
-   *             {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
-   */
-  @Deprecated
-  public Set<? extends RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    // To be backward compatible, by default call the getOptimizerRules() method.
-    return getOptimizerRules(optimizerRulesContext);
-  }
-
-  /**
-   *
-   * TODO: Move this method to {@link StoragePlugin} interface in next major version release.
-   */
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
-    switch (phase) {
-    case LOGICAL_PRUNE_AND_JOIN:
-    case LOGICAL_PRUNE:
-    case PARTITION_PRUNING:
-      return getLogicalOptimizerRules(optimizerContext);
-    case PHYSICAL:
-      return getPhysicalOptimizerRules(optimizerContext);
-    case LOGICAL:
-    case JOIN_PLANNING:
-    default:
-      return ImmutableSet.of();
-    }
-  }
-
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
     return getPhysicalScan(userName, selection);
@@ -162,4 +109,8 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
     return context;
   }
 
+  @Override
+  public String toString() {
+    return name;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index db4ea3a659..96b75f644c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -28,9 +28,11 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 
 /** Interface for all implementations of the storage plugins. Different implementations of the storage
  * formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -72,8 +74,9 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
    * optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set.
    * @return an empty set or a set of plugin specific physical optimizer rules.
    */
-  @Deprecated
-  Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
+  public default Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    return ImmutableSet.of();
+  }
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
index d0b4617f5a..6ec025712b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
@@ -52,10 +52,14 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
  * for a particular scan.
  * <p>
  * General usage in a storage plugin: <code><pre>
- * public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
- *        OptimizerRulesContext optimizerRulesContext) {
- *   return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
- *      new MyPushDownListener(...));
+ * public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ *        OptimizerRulesContext optimizerRulesContext, PlannerPhase phase) {
+ *   switch (phase) {
+ *     case PHYSICAL:
+ *       return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
+ *        new MyPushDownListener(...));
+ *     ...
+ *   }
  * }
  * </pre></code>
  */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index be56e11a1a..da2560edbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -126,9 +127,18 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of(
-        InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_PROJECT,
-        InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_SCAN);
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext,
+    PlannerPhase phase
+  ) {
+    switch (phase) {
+      case PHYSICAL:
+        return ImmutableSet.of(
+          InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_PROJECT,
+          InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_SCAN
+        );
+      default:
+        return ImmutableSet.of();
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index 488f3fb35a..d0b13de2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
 public class FileSystemUtil {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemUtil.class);
-  public static final String RECURSIVE_FILE_LISTING_MAX_SIZE = "drill.exec.recursive_file_listing_max_size";
+  public static final String RECURSIVE_LISTING_PROP_NAME = "drill.exec.recursive_file_listing_max_size";
 
   /**
    * Filter that will accept all files and directories.
@@ -253,7 +253,7 @@ public class FileSystemUtil {
   private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
     ForkJoinPool pool = new ForkJoinPool();
     AtomicInteger fileCounter = new AtomicInteger(0);
-    int recursiveListingMaxSize = fs.getConf().getInt(RECURSIVE_FILE_LISTING_MAX_SIZE, 0);
+    int recursiveListingMaxSize = fs.getConf().getInt(RECURSIVE_LISTING_PROP_NAME, 0);
 
     try {
       RecursiveListing task = new RecursiveListing(
@@ -306,7 +306,7 @@ public class FileSystemUtil {
     private final Scope scope;
     private final boolean suppressExceptions;
     private final PathFilter filter;
-    // Running count of files for comparison with RECURSIVE_FILE_LISTING_MAX_SIZE
+    // Running count of files for comparison with recursiveListingMaxSize
     private final AtomicInteger fileCounter;
     private final int recursiveListingMaxSize;
     private final ForkJoinPool pool;
@@ -343,10 +343,11 @@ public class FileSystemUtil {
             throw UserException
               .resourceError()
               .message(
-                "File listing size limit of %d exceeded recursing through path %s, see JVM system property %s",
+                "File listing size limit of %d exceeded recursing through " +
+                  "path %s, see Hadoop config property %s",
                 recursiveListingMaxSize,
                 path,
-                RECURSIVE_FILE_LISTING_MAX_SIZE
+                RECURSIVE_LISTING_PROP_NAME
               )
               .build(logger);
           } finally {
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 765b8d2fe1..a1dbdeb210 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -755,6 +755,9 @@ drill.exec.options: {
     web.display_format.time: "",
     window.enable: true,
     storage.list_files_recursively: false,
+    storage.plugin_retry_attempts: 1,
+    storage.plugin_retry_attempt_delay: 2000,
+    storage.plugin_auto_disable: true,
     # ============ index plan related options ==============
     planner.use_simple_optimizer: false,
     planner.enable_index_planning: true,
@@ -773,7 +776,6 @@ drill.exec.options: {
     exec.query.return_result_set_for_ddl: true,
     exec.query.max_rows: 0,
     exec.return_result_set_for_ddl: true,
-    storage.list_files_recursively: false,
     exec.statistics.ndv_accuracy: 20,
     exec.statistics.ndv_extrapolation_bf_elements: 1000000,
     exec.statistics.ndv_extrapolation_bf_fpprobability: 10,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
index a7d27ad0e4..4a78e6436c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.mock.MockBreakageStorage;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -28,6 +30,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class TestSchema extends DrillTest {
 
@@ -41,21 +45,66 @@ public class TestSchema extends DrillTest {
   public static void setup() throws Exception {
     cluster = ClusterFixture.builder(dirTestWatcher).buildCustomMockStorage();
     boolean breakRegisterSchema = true;
-    // With a broken storage which will throw exception in registerSchema, every query (even on other storage)
-    // shall fail if Drill is still loading all schemas (include the broken schema) before a query.
     cluster.insertMockStorage("mock_broken", breakRegisterSchema);
     cluster.insertMockStorage("mock_good", !breakRegisterSchema);
     client = cluster.clientFixture();
   }
 
   @Test (expected = Exception.class)
-  public void testQueryBrokenStorage() throws Exception {
+  public void testQueryBrokenRegSchema() throws Exception {
+    MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
     String sql = "SELECT id_i, name_s10 FROM `mock_broken`.`employees_5`";
+
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, false);
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 2);
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY, 0);
+
+    mbs.setBreakRegister(true);
     try {
       client.queryBuilder().sql(sql).run();
     } catch (Exception ex) {
-      assertTrue(ex.getMessage().contains("RESOURCE ERROR: Failed to load schema"));
+      assertTrue(
+        ex.getMessage()
+          .split(System.lineSeparator())[0]
+          .matches("^RESOURCE ERROR: Failed to load schema.*")
+      );
+
+      assertEquals(3, mbs.registerAttemptCount);
+      // The plugin should still be enabled because we set auto_disable to false.
+      assertTrue(cluster.storageRegistry().availablePlugins().contains("mock_broken"));
+      throw ex;
+    } finally {
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY);
+    }
+  }
+
+  @Test (expected = Exception.class)
+  public void testAutoDisableBrokenRegSchema() throws Exception {
+    MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
+    String sql = "SELECT id_i, name_s10 FROM `mock_broken`.`employees_5`";
+
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, true);
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 0);
+
+    mbs.setBreakRegister(true);
+    try {
+      client.queryBuilder().sql(sql).run();
+    } catch (Exception ex) {
+      assertTrue(
+        ex.getMessage()
+          .split(System.lineSeparator())[0]
+          .matches("^PLUGIN ERROR: Failed to load schema.*")
+      );
+
+      // The plugin should no longer be enabled because we set auto_disable to false.
+      assertFalse(cluster.storageRegistry().availablePlugins().contains("mock_broken"));
       throw ex;
+    } finally {
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
+      cluster.storageRegistry().setEnabled("mock_broken", true);
     }
   }
 
@@ -75,12 +124,20 @@ public class TestSchema extends DrillTest {
 
   @Test (expected = Exception.class)
   public void testUseBrokenStorage() throws Exception {
+    MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, false);
+    client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 0);
+
+    mbs.setBreakRegister(true);
     try {
       String use_dfs = "use mock_broken";
       client.queryBuilder().sql(use_dfs).run();
     } catch(Exception ex) {
       assertTrue(ex.getMessage().contains("RESOURCE ERROR: Failed to load schema"));
       throw ex;
+    } finally {
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+      client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
index 48ee441ab6..269dbbcfa6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
@@ -206,15 +206,15 @@ public class FileSystemUtilTest extends FileSystemUtilTestBase {
   @Test // DRILL-8283
   public void testRecursiveListingMaxSize() throws IOException {
     Configuration conf = fs.getConf();
-    int oldSize = conf.getInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, 0);
-    conf.setInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, 5);
+    int oldSize = conf.getInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, 0);
+    conf.setInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, 5);
 
     try {
       FileSystemUtil.listAll(fs, new Path(base, "a"), true);
     } catch (UserException ex) {
       assertThat(ex.getMessage(), containsString("RESOURCE ERROR: File listing size limit"));
     } finally {
-      conf.setInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, oldSize);
+      conf.setInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, oldSize);
     }
   }
 }
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 5071b5d793..622490b67f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2046,6 +2046,16 @@ public final class UserBitShared {
        * <code>UNSPECIFIED_ERROR = 13;</code>
        */
       UNSPECIFIED_ERROR(13),
+      /**
+       * <pre>
+       * Plugin exception
+       * - A failure has occurred within a plugin.
+       * Indicates that a plugin is misconfigured or contains a bug.
+       * </pre>
+       *
+       * <code>PLUGIN = 14;</code>
+       */
+      PLUGIN(14),
       ;
 
       /**
@@ -2188,6 +2198,16 @@ public final class UserBitShared {
        * <code>UNSPECIFIED_ERROR = 13;</code>
        */
       public static final int UNSPECIFIED_ERROR_VALUE = 13;
+      /**
+       * <pre>
+       * Plugin exception
+       * - A failure has occurred within a plugin.
+       * Indicates that a plugin is misconfigured or contains a bug.
+       * </pre>
+       *
+       * <code>PLUGIN = 14;</code>
+       */
+      public static final int PLUGIN_VALUE = 14;
 
 
       public final int getNumber() {
@@ -2224,6 +2244,7 @@ public final class UserBitShared {
           case 11: return EXECUTION_ERROR;
           case 12: return INTERNAL_ERROR;
           case 13: return UNSPECIFIED_ERROR;
+          case 14: return PLUGIN;
           default: return null;
         }
       }
@@ -28533,113 +28554,114 @@ public final class UserBitShared {
       "s.proto\032\022Coordination.proto\032\017SchemaDef.p" +
       "roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001" +
       "(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 " +
-      "\001(\020\"\355\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
+      "\001(\020\"\371\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
       "\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint" +
       "\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill" +
       "PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex" +
       "ception\030\005 \001(\0132\035.exec.shared.ExceptionWra" +
       "pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share" +
-      "d.ParsingError\"\362\001\n\tErrorType\022\016\n\nCONNECTI" +
+      "d.ParsingError\"\376\001\n\tErrorType\022\016\n\nCONNECTI" +
       "ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F" +
       "UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004" +
       "PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU" +
       "PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\022\023\n\017E" +
       "XECUTION_ERROR\020\013\022\022\n\016INTERNAL_ERROR\020\014\022\025\n\021" +
-      "UNSPECIFIED_ERROR\020\r\"\246\001\n\020ExceptionWrapper" +
-      "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001" +
-      "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St" +
-      "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035." +
-      "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr" +
-      "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n" +
-      "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n" +
-      "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005" +
-      " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001" +
-      "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(" +
-      "\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016RecordBatchDef\022\024\n" +
-      "\014record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exe" +
-      "c.shared.SerializedField\022)\n!carries_two_" +
-      "byte_selection_vector\030\003 \001(\010\022\033\n\023affected_" +
-      "rows_count\030\004 \001(\005\"\205\001\n\010NamePart\022(\n\004type\030\001 " +
-      "\001(\0162\032.exec.shared.NamePart.Type\022\014\n\004name\030" +
-      "\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.shared.NameP" +
-      "art\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY\020\001\"\324\001\n\017Ser" +
-      "ializedField\022%\n\nmajor_type\030\001 \001(\0132\021.commo" +
-      "n.MajorType\022(\n\tname_part\030\002 \001(\0132\025.exec.sh" +
-      "ared.NamePart\022+\n\005child\030\003 \003(\0132\034.exec.shar" +
-      "ed.SerializedField\022\023\n\013value_count\030\004 \001(\005\022" +
-      "\027\n\017var_byte_length\030\005 \001(\005\022\025\n\rbuffer_lengt" +
-      "h\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_id\030\001 \001(\005\022\030" +
-      "\n\020memory_footprint\030\002 \001(\003\"\263\002\n\013QueryResult" +
-      "\0228\n\013query_state\030\001 \001(\0162#.exec.shared.Quer" +
-      "yResult.QueryState\022&\n\010query_id\030\002 \001(\0132\024.e" +
-      "xec.shared.QueryId\022(\n\005error\030\003 \003(\0132\031.exec" +
-      ".shared.DrillPBError\"\227\001\n\nQueryState\022\014\n\010S" +
-      "TARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n" +
-      "\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CANCELLATION_" +
-      "REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007" +
-      "\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData\022&\n\010query_id" +
-      "\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n\trow_coun" +
-      "t\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.Recor" +
-      "dBatchDef\022\033\n\023affected_rows_count\030\004 \001(\005\"\330" +
-      "\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001" +
-      "(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.QueryRes" +
-      "ult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007forem" +
-      "an\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014opti" +
-      "ons_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqu" +
-      "eue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryProfile\022 \n\002id" +
-      "\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001" +
-      "(\0162\026.exec.shared.QueryType\022\r\n\005start\030\003 \001(" +
-      "\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 " +
-      "\001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndp" +
-      "oint\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryR" +
-      "esult.QueryState\022\027\n\017total_fragments\030\t \001(" +
-      "\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020fragmen" +
-      "t_profile\030\013 \003(\0132!.exec.shared.MajorFragm" +
-      "entProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001" +
-      "(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001" +
-      "(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021" +
-      " \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 " +
-      "\001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 " +
-      "\001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\tautoLimit\030\027 \001" +
-      "(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"t\n\024MajorFrag" +
-      "mentProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022A" +
-      "\n\026minor_fragment_profile\030\002 \003(\0132!.exec.sh" +
-      "ared.MinorFragmentProfile\"\350\002\n\024MinorFragm" +
-      "entProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared." +
-      "FragmentState\022(\n\005error\030\002 \001(\0132\031.exec.shar" +
-      "ed.DrillPBError\022\031\n\021minor_fragment_id\030\003 \001" +
-      "(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.shar" +
-      "ed.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022\020" +
-      "\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n" +
-      "\017max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\013" +
-      "2\026.exec.DrillbitEndpoint\022\023\n\013last_update\030" +
-      "\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\237\002\n\017Operato" +
-      "rProfile\0221\n\rinput_profile\030\001 \003(\0132\032.exec.s" +
-      "hared.StreamProfile\022\023\n\013operator_id\030\003 \001(\005" +
-      "\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022\023\n\013setup_nan" +
-      "os\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_" +
-      "local_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010" +
-      " \003(\0132\030.exec.shared.MetricValue\022\022\n\nwait_n" +
-      "anos\030\t \001(\003\022\032\n\022operator_type_name\030\n \001(\t\"B" +
-      "\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batc" +
-      "hes\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValu" +
-      "e\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003" +
-      "\022\024\n\014double_value\030\003 \001(\001\")\n\010Registry\022\035\n\003ja" +
-      "r\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name" +
-      "\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(\t\"W\n\013Sa" +
-      "slMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001" +
-      "(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared.SaslSta" +
-      "tus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BI" +
-      "T_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001" +
-      "\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION" +
-      "\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFragmentS" +
-      "tate\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION" +
-      "\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELL" +
-      "ED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQUEST" +
-      "ED\020\006*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\n" +
-      "SASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SA" +
-      "SL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apa" +
-      "che.drill.exec.protoB\rUserBitSharedH\001"
+      "UNSPECIFIED_ERROR\020\r\022\n\n\006PLUGIN\020\016\"\246\001\n\020Exce" +
+      "ptionWrapper\022\027\n\017exception_class\030\001 \001(\t\022\017\n" +
+      "\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132%.ex" +
+      "ec.shared.StackTraceElementWrapper\022,\n\005ca" +
+      "use\030\004 \001(\0132\035.exec.shared.ExceptionWrapper" +
+      "\"\205\001\n\030StackTraceElementWrapper\022\022\n\nclass_n" +
+      "ame\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line_num" +
+      "ber\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is_nat" +
+      "ive_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014star" +
+      "t_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend" +
+      "_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016Recor" +
+      "dBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005field" +
+      "\030\002 \003(\0132\034.exec.shared.SerializedField\022)\n!" +
+      "carries_two_byte_selection_vector\030\003 \001(\010\022" +
+      "\033\n\023affected_rows_count\030\004 \001(\005\"\205\001\n\010NamePar" +
+      "t\022(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.T" +
+      "ype\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec." +
+      "shared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARR" +
+      "AY\020\001\"\324\001\n\017SerializedField\022%\n\nmajor_type\030\001" +
+      " \001(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001" +
+      "(\0132\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\013" +
+      "2\034.exec.shared.SerializedField\022\023\n\013value_" +
+      "count\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\025\n\r" +
+      "buffer_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007nod" +
+      "e_id\030\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\263\002\n" +
+      "\013QueryResult\0228\n\013query_state\030\001 \001(\0162#.exec" +
+      ".shared.QueryResult.QueryState\022&\n\010query_" +
+      "id\030\002 \001(\0132\024.exec.shared.QueryId\022(\n\005error\030" +
+      "\003 \003(\0132\031.exec.shared.DrillPBError\"\227\001\n\nQue" +
+      "ryState\022\014\n\010STARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCO" +
+      "MPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026C" +
+      "ANCELLATION_REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n" +
+      "\tPREPARING\020\007\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData" +
+      "\022&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId" +
+      "\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec." +
+      "shared.RecordBatchDef\022\033\n\023affected_rows_c" +
+      "ount\030\004 \001(\005\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022" +
+      "\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.sha" +
+      "red.QueryResult.QueryState\022\017\n\004user\030\004 \001(\t" +
+      ":\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.DrillbitEndp" +
+      "oint\022\024\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost" +
+      "\030\007 \001(\001\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryP" +
+      "rofile\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId" +
+      "\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryType\022\r" +
+      "\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(" +
+      "\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec." +
+      "DrillbitEndpoint\0222\n\005state\030\010 \001(\0162#.exec.s" +
+      "hared.QueryResult.QueryState\022\027\n\017total_fr" +
+      "agments\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(" +
+      "\005\022;\n\020fragment_profile\030\013 \003(\0132!.exec.share" +
+      "d.MajorFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022" +
+      "\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010" +
+      "error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014op" +
+      "tions_json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014que" +
+      "ueWaitEnd\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nq" +
+      "ueue_name\030\025 \001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\ta" +
+      "utoLimit\030\027 \001(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"" +
+      "t\n\024MajorFragmentProfile\022\031\n\021major_fragmen" +
+      "t_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030\002 \003" +
+      "(\0132!.exec.shared.MinorFragmentProfile\"\350\002" +
+      "\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\0162\032." +
+      "exec.shared.FragmentState\022(\n\005error\030\002 \001(\013" +
+      "2\031.exec.shared.DrillPBError\022\031\n\021minor_fra" +
+      "gment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(\013" +
+      "2\034.exec.shared.OperatorProfile\022\022\n\nstart_" +
+      "time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_u" +
+      "sed\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010en" +
+      "dpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n\013" +
+      "last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003" +
+      "\"\237\002\n\017OperatorProfile\0221\n\rinput_profile\030\001 " +
+      "\003(\0132\032.exec.shared.StreamProfile\022\023\n\013opera" +
+      "tor_id\030\003 \001(\005\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022" +
+      "\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 " +
+      "\001(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003" +
+      "\022(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricVal" +
+      "ue\022\022\n\nwait_nanos\030\t \001(\003\022\032\n\022operator_type_" +
+      "name\030\n \001(\t\"B\n\rStreamProfile\022\017\n\007records\030\001" +
+      " \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J" +
+      "\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong" +
+      "_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Re" +
+      "gistry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n" +
+      "\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signature" +
+      "\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t" +
+      "\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.sh" +
+      "ared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CON" +
+      "TROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryT" +
+      "ype\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022" +
+      "\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207" +
+      "\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITIN" +
+      "G_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020" +
+      "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLA" +
+      "TION_REQUESTED\020\006*g\n\nSaslStatus\022\020\n\014SASL_U" +
+      "NKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROG" +
+      "RESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020" +
+      "\004B.\n\033org.apache.drill.exec.protoB\rUserBi" +
+      "tSharedH\001"
     };
     descriptor = com.google.protobuf.Descriptors.FileDescriptor
       .internalBuildGeneratedFileFrom(descriptorData,
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 2ed8826db6..dafd533cdd 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -123,6 +123,11 @@ message DrillPBError{
      * closer to the cause.
      */
     UNSPECIFIED_ERROR = 13;
+    /* Plugin exception
+     * - A failure has occurred within a plugin.
+     * Indicates that a plugin is misconfigured or contains a bug.
+     */
+    PLUGIN = 14;
   }
   optional string error_id = 1; // for debug tracing purposes
   optional DrillbitEndpoint endpoint = 2;