You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/10/18 10:15:39 UTC
[drill] branch master updated: DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 02ce64eb3f DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)
02ce64eb3f is described below
commit 02ce64eb3f1d19dd7d4968383f1700c5628a4d15
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Tue Oct 18 12:15:31 2022 +0200
DRILL-8314: Add support for automatically retrying and disabling broken storage plugins (#2655)
---
.editorconfig | 27 ++-
.../drill/common/exceptions/UserException.java | 46 +++--
.../native/client/src/protobuf/UserBitShared.pb.cc | 199 +++++++++----------
.../native/client/src/protobuf/UserBitShared.pb.h | 7 +-
.../drill/exec/store/druid/DruidStoragePlugin.java | 14 +-
.../drill/exec/store/hbase/HBaseStoragePlugin.java | 16 +-
.../drill/exec/store/hive/HiveStoragePlugin.java | 61 +++---
.../exec/store/jdbc/CapitalizingJdbcSchema.java | 11 +-
.../drill/exec/store/jdbc/DefaultJdbcDialect.java | 2 +-
.../drill/exec/store/jdbc/DrillJdbcConvention.java | 5 +-
.../exec/store/jdbc/JdbcConventionFactory.java | 10 +-
.../drill/exec/store/jdbc/JdbcDialectFactory.java | 37 ++--
.../store/jdbc/JdbcInsertWriterBatchCreator.java | 2 +-
.../exec/store/jdbc/JdbcIntermediatePrel.java | 11 +-
.../jdbc/JdbcIntermediatePrelConverterRule.java | 9 +-
.../org/apache/drill/exec/store/jdbc/JdbcPrel.java | 9 +-
.../drill/exec/store/jdbc/JdbcRecordWriter.java | 8 +-
.../drill/exec/store/jdbc/JdbcStoragePlugin.java | 32 ++-
.../exec/store/jdbc/JdbcTableModifyWriter.java | 6 +-
.../exec/store/jdbc/JdbcWriterBatchCreator.java | 10 +-
.../jdbc/clickhouse/ClickhouseJdbcDialect.java | 2 +-
.../drill/exec/store/kafka/KafkaStoragePlugin.java | 13 +-
.../exec/store/phoenix/PhoenixStoragePlugin.java | 14 +-
.../org/apache/calcite/jdbc/DynamicRootSchema.java | 84 ++++++--
.../java/org/apache/drill/exec/ExecConstants.java | 31 +++
.../apache/drill/exec/planner/PlannerPhase.java | 10 +-
.../drill/exec/planner/sql/DrillSqlWorker.java | 38 ++--
.../exec/server/options/SystemOptionManager.java | 5 +-
.../drill/exec/store/AbstractStoragePlugin.java | 57 +-----
.../org/apache/drill/exec/store/StoragePlugin.java | 7 +-
.../store/base/filter/FilterPushDownStrategy.java | 12 +-
.../store/ischema/InfoSchemaStoragePlugin.java | 18 +-
.../org/apache/drill/exec/util/FileSystemUtil.java | 11 +-
.../java-exec/src/main/resources/drill-module.conf | 4 +-
.../drill/exec/physical/impl/TestSchema.java | 65 ++++++-
.../apache/drill/exec/util/FileSystemUtilTest.java | 6 +-
.../org/apache/drill/exec/proto/UserBitShared.java | 216 ++++++++++++---------
protocol/src/main/protobuf/UserBitShared.proto | 5 +
38 files changed, 689 insertions(+), 431 deletions(-)
diff --git a/.editorconfig b/.editorconfig
index 856000f798..22763960b9 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -24,10 +24,36 @@ root = true
# Default, notably applicable to Java, XML, JavaScript and Shell
[*]
end_of_line = lf
+trim_trailing_whitespace = true
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 2
+max_line_length = 100
+
+[*.java]
+ij_any_spaces_around_additive_operators = true
+ij_any_spaces_around_assignment_operators = true
+ij_any_spaces_around_bitwise_operators = true
+ij_any_spaces_around_equality_operators = true
+ij_any_spaces_around_lambda_arrow = true
+ij_any_spaces_around_logical_operators = true
+ij_any_spaces_around_multiplicative_operators = true
+ij_any_spaces_around_relational_operators = true
+ij_any_spaces_around_shift_operators = true
+ij_continuation_indent_size = 2
+ij_java_if_brace_force = always
+ij_java_indent_case_from_switch = false
+ij_java_space_after_colon = true
+ij_java_space_before_colon = true
+ij_java_ternary_operation_signs_on_next_line = true
+ij_java_use_single_class_imports = true
+ij_java_wrap_long_lines = true
+ij_java_align_multiline_parameters = false
+
+# Windows scripts
+[{*.bat,*.cmd}]
+end_of_line = crlf
# C and C++
[*.{c, cpp}]
@@ -42,4 +68,3 @@ indent_size = ignore
[**.min.js]
indent_style = ignore
indent_size = ignore
-
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 03eaa596f6..437dacf267 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -87,7 +87,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#SYSTEM
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -114,7 +114,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#CONNECTION
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -140,7 +140,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_READ
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -166,7 +166,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_WRITE
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -192,7 +192,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#FUNCTION
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -218,7 +218,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PARSE
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -244,7 +244,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#VALIDATION
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -270,7 +270,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PERMISSION
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -296,7 +296,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLAN
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -304,6 +304,24 @@ public class UserException extends DrillRuntimeException {
return new Builder(DrillPBError.ErrorType.PLAN, cause);
}
+ /**
+ * Wraps the passed exception inside a plugin error.
+ * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+ * <p>If the wrapped exception is, or wraps, a user exception it will be returned by
+ * {@link Builder#build(Logger)}
+ * instead of creating a new exception. Any added context will be added to the user exception
+ * as well.
+ *
+ * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLUGIN
+ *
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user
+ * exception it will be returned by the builder instead of creating a new user exception
+ * @return user exception builder
+ */
+ public static Builder pluginError(final Throwable cause) {
+ return new Builder(DrillPBError.ErrorType.PLUGIN, cause);
+ }
+
/**
* Creates a new user exception builder .
*
@@ -322,7 +340,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#RESOURCE
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -348,7 +366,7 @@ public class UserException extends DrillRuntimeException {
*
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#UNSUPPORTED_OPERATION
*
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -376,7 +394,7 @@ public class UserException extends DrillRuntimeException {
* Wraps an error that arises from execution due to issues in the query, in
* the environment and so on -- anything other than "this should never occur"
* type checks.
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -389,7 +407,7 @@ public class UserException extends DrillRuntimeException {
* Indicates an internal validation failed or similar unexpected error. Indicates
* the problem is likely within Drill itself rather than due to the environment,
* query, etc.
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
@@ -408,7 +426,7 @@ public class UserException extends DrillRuntimeException {
* error types. In practice, using this exception indicates that error handling
* should be moved closer to the source of the exception so we can provide the
* user with a better explanation than "something went wrong."
- * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * @param cause exception we want the user exception to wrap. If cause is, or wraps, a user exception it will be
* returned by the builder instead of creating a new user exception
* @return user exception builder
*/
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index e5a39d62fb..273b3fc328 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -770,113 +770,114 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
"s.proto\032\022Coordination.proto\032\017SchemaDef.p"
"roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001"
"(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 "
- "\001(\020\"\355\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022("
+ "\001(\020\"\371\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022("
"\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint"
"\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill"
"PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex"
"ception\030\005 \001(\0132\035.exec.shared.ExceptionWra"
"pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share"
- "d.ParsingError\"\362\001\n\tErrorType\022\016\n\nCONNECTI"
+ "d.ParsingError\"\376\001\n\tErrorType\022\016\n\nCONNECTI"
"ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F"
"UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004"
"PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU"
"PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\022\023\n\017E"
"XECUTION_ERROR\020\013\022\022\n\016INTERNAL_ERROR\020\014\022\025\n\021"
- "UNSPECIFIED_ERROR\020\r\"\246\001\n\020ExceptionWrapper"
- "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001"
- "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St"
- "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035."
- "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr"
- "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n"
- "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n"
- "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005"
- " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001"
- "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001("
- "\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016RecordBatchDef\022\024\n"
- "\014record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exe"
- "c.shared.SerializedField\022)\n!carries_two_"
- "byte_selection_vector\030\003 \001(\010\022\033\n\023affected_"
- "rows_count\030\004 \001(\005\"\205\001\n\010NamePart\022(\n\004type\030\001 "
- "\001(\0162\032.exec.shared.NamePart.Type\022\014\n\004name\030"
- "\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.shared.NameP"
- "art\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY\020\001\"\324\001\n\017Ser"
- "ializedField\022%\n\nmajor_type\030\001 \001(\0132\021.commo"
- "n.MajorType\022(\n\tname_part\030\002 \001(\0132\025.exec.sh"
- "ared.NamePart\022+\n\005child\030\003 \003(\0132\034.exec.shar"
- "ed.SerializedField\022\023\n\013value_count\030\004 \001(\005\022"
- "\027\n\017var_byte_length\030\005 \001(\005\022\025\n\rbuffer_lengt"
- "h\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_id\030\001 \001(\005\022\030"
- "\n\020memory_footprint\030\002 \001(\003\"\263\002\n\013QueryResult"
- "\0228\n\013query_state\030\001 \001(\0162#.exec.shared.Quer"
- "yResult.QueryState\022&\n\010query_id\030\002 \001(\0132\024.e"
- "xec.shared.QueryId\022(\n\005error\030\003 \003(\0132\031.exec"
- ".shared.DrillPBError\"\227\001\n\nQueryState\022\014\n\010S"
- "TARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n"
- "\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CANCELLATION_"
- "REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007"
- "\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData\022&\n\010query_id"
- "\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n\trow_coun"
- "t\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.Recor"
- "dBatchDef\022\033\n\023affected_rows_count\030\004 \001(\005\"\330"
- "\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001"
- "(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.QueryRes"
- "ult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007forem"
- "an\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014opti"
- "ons_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqu"
- "eue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryProfile\022 \n\002id"
- "\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001"
- "(\0162\026.exec.shared.QueryType\022\r\n\005start\030\003 \001("
- "\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 "
- "\001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndp"
- "oint\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryR"
- "esult.QueryState\022\027\n\017total_fragments\030\t \001("
- "\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020fragmen"
- "t_profile\030\013 \003(\0132!.exec.shared.MajorFragm"
- "entProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001"
- "(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001"
- "(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021"
- " \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 "
- "\001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 "
- "\001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\tautoLimit\030\027 \001"
- "(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"t\n\024MajorFrag"
- "mentProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022A"
- "\n\026minor_fragment_profile\030\002 \003(\0132!.exec.sh"
- "ared.MinorFragmentProfile\"\350\002\n\024MinorFragm"
- "entProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared."
- "FragmentState\022(\n\005error\030\002 \001(\0132\031.exec.shar"
- "ed.DrillPBError\022\031\n\021minor_fragment_id\030\003 \001"
- "(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.shar"
- "ed.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022\020"
- "\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n"
- "\017max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\013"
- "2\026.exec.DrillbitEndpoint\022\023\n\013last_update\030"
- "\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\237\002\n\017Operato"
- "rProfile\0221\n\rinput_profile\030\001 \003(\0132\032.exec.s"
- "hared.StreamProfile\022\023\n\013operator_id\030\003 \001(\005"
- "\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022\023\n\013setup_nan"
- "os\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_"
- "local_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010"
- " \003(\0132\030.exec.shared.MetricValue\022\022\n\nwait_n"
- "anos\030\t \001(\003\022\032\n\022operator_type_name\030\n \001(\t\"B"
- "\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batc"
- "hes\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValu"
- "e\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003"
- "\022\024\n\014double_value\030\003 \001(\001\")\n\010Registry\022\035\n\003ja"
- "r\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name"
- "\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(\t\"W\n\013Sa"
- "slMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001"
- "(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared.SaslSta"
- "tus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BI"
- "T_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001"
- "\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION"
- "\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFragmentS"
- "tate\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION"
- "\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELL"
- "ED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQUEST"
- "ED\020\006*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\n"
- "SASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SA"
- "SL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apa"
- "che.drill.exec.protoB\rUserBitSharedH\001"
+ "UNSPECIFIED_ERROR\020\r\022\n\n\006PLUGIN\020\016\"\246\001\n\020Exce"
+ "ptionWrapper\022\027\n\017exception_class\030\001 \001(\t\022\017\n"
+ "\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132%.ex"
+ "ec.shared.StackTraceElementWrapper\022,\n\005ca"
+ "use\030\004 \001(\0132\035.exec.shared.ExceptionWrapper"
+ "\"\205\001\n\030StackTraceElementWrapper\022\022\n\nclass_n"
+ "ame\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line_num"
+ "ber\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is_nat"
+ "ive_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014star"
+ "t_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend"
+ "_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016Recor"
+ "dBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005field"
+ "\030\002 \003(\0132\034.exec.shared.SerializedField\022)\n!"
+ "carries_two_byte_selection_vector\030\003 \001(\010\022"
+ "\033\n\023affected_rows_count\030\004 \001(\005\"\205\001\n\010NamePar"
+ "t\022(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.T"
+ "ype\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec."
+ "shared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARR"
+ "AY\020\001\"\324\001\n\017SerializedField\022%\n\nmajor_type\030\001"
+ " \001(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001"
+ "(\0132\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\013"
+ "2\034.exec.shared.SerializedField\022\023\n\013value_"
+ "count\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\025\n\r"
+ "buffer_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007nod"
+ "e_id\030\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\263\002\n"
+ "\013QueryResult\0228\n\013query_state\030\001 \001(\0162#.exec"
+ ".shared.QueryResult.QueryState\022&\n\010query_"
+ "id\030\002 \001(\0132\024.exec.shared.QueryId\022(\n\005error\030"
+ "\003 \003(\0132\031.exec.shared.DrillPBError\"\227\001\n\nQue"
+ "ryState\022\014\n\010STARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCO"
+ "MPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026C"
+ "ANCELLATION_REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n"
+ "\tPREPARING\020\007\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData"
+ "\022&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId"
+ "\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec."
+ "shared.RecordBatchDef\022\033\n\023affected_rows_c"
+ "ount\030\004 \001(\005\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022"
+ "\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.sha"
+ "red.QueryResult.QueryState\022\017\n\004user\030\004 \001(\t"
+ ":\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.DrillbitEndp"
+ "oint\022\024\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost"
+ "\030\007 \001(\001\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryP"
+ "rofile\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId"
+ "\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryType\022\r"
+ "\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001("
+ "\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec."
+ "DrillbitEndpoint\0222\n\005state\030\010 \001(\0162#.exec.s"
+ "hared.QueryResult.QueryState\022\027\n\017total_fr"
+ "agments\030\t \001(\005\022\032\n\022finished_fragments\030\n \001("
+ "\005\022;\n\020fragment_profile\030\013 \003(\0132!.exec.share"
+ "d.MajorFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022"
+ "\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010"
+ "error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014op"
+ "tions_json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014que"
+ "ueWaitEnd\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nq"
+ "ueue_name\030\025 \001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\ta"
+ "utoLimit\030\027 \001(\005\022\027\n\017scanned_plugins\030\030 \003(\t\""
+ "t\n\024MajorFragmentProfile\022\031\n\021major_fragmen"
+ "t_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030\002 \003"
+ "(\0132!.exec.shared.MinorFragmentProfile\"\350\002"
+ "\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\0162\032."
+ "exec.shared.FragmentState\022(\n\005error\030\002 \001(\013"
+ "2\031.exec.shared.DrillPBError\022\031\n\021minor_fra"
+ "gment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(\013"
+ "2\034.exec.shared.OperatorProfile\022\022\n\nstart_"
+ "time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_u"
+ "sed\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010en"
+ "dpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n\013"
+ "last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003"
+ "\"\237\002\n\017OperatorProfile\0221\n\rinput_profile\030\001 "
+ "\003(\0132\032.exec.shared.StreamProfile\022\023\n\013opera"
+ "tor_id\030\003 \001(\005\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022"
+ "\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 "
+ "\001(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003"
+ "\022(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricVal"
+ "ue\022\022\n\nwait_nanos\030\t \001(\003\022\032\n\022operator_type_"
+ "name\030\n \001(\t\"B\n\rStreamProfile\022\017\n\007records\030\001"
+ " \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J"
+ "\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong"
+ "_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Re"
+ "gistry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n"
+ "\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signature"
+ "\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t"
+ "\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.sh"
+ "ared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CON"
+ "TROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryT"
+ "ype\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022"
+ "\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207"
+ "\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITIN"
+ "G_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020"
+ "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLA"
+ "TION_REQUESTED\020\006*g\n\nSaslStatus\022\020\n\014SASL_U"
+ "NKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROG"
+ "RESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020"
+ "\004B.\n\033org.apache.drill.exec.protoB\rUserBi"
+ "tSharedH\001"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_UserBitShared_2eproto_deps[3] = {
&::descriptor_table_Coordination_2eproto,
@@ -885,7 +886,7 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor
};
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_UserBitShared_2eproto_once;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_UserBitShared_2eproto = {
- false, false, 4437, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto",
+ false, false, 4449, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto",
&descriptor_table_UserBitShared_2eproto_once, descriptor_table_UserBitShared_2eproto_deps, 3, 22,
schemas, file_default_instances, TableStruct_UserBitShared_2eproto::offsets,
file_level_metadata_UserBitShared_2eproto, file_level_enum_descriptors_UserBitShared_2eproto, file_level_service_descriptors_UserBitShared_2eproto,
@@ -918,6 +919,7 @@ bool DrillPBError_ErrorType_IsValid(int value) {
case 11:
case 12:
case 13:
+ case 14:
return true;
default:
return false;
@@ -939,6 +941,7 @@ constexpr DrillPBError_ErrorType DrillPBError::VALIDATION;
constexpr DrillPBError_ErrorType DrillPBError::EXECUTION_ERROR;
constexpr DrillPBError_ErrorType DrillPBError::INTERNAL_ERROR;
constexpr DrillPBError_ErrorType DrillPBError::UNSPECIFIED_ERROR;
+constexpr DrillPBError_ErrorType DrillPBError::PLUGIN;
constexpr DrillPBError_ErrorType DrillPBError::ErrorType_MIN;
constexpr DrillPBError_ErrorType DrillPBError::ErrorType_MAX;
constexpr int DrillPBError::ErrorType_ARRAYSIZE;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index a7ad1ef0d8..ffbc7cffa8 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -168,11 +168,12 @@ enum DrillPBError_ErrorType : int {
DrillPBError_ErrorType_VALIDATION = 10,
DrillPBError_ErrorType_EXECUTION_ERROR = 11,
DrillPBError_ErrorType_INTERNAL_ERROR = 12,
- DrillPBError_ErrorType_UNSPECIFIED_ERROR = 13
+ DrillPBError_ErrorType_UNSPECIFIED_ERROR = 13,
+ DrillPBError_ErrorType_PLUGIN = 14
};
bool DrillPBError_ErrorType_IsValid(int value);
constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MIN = DrillPBError_ErrorType_CONNECTION;
-constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MAX = DrillPBError_ErrorType_UNSPECIFIED_ERROR;
+constexpr DrillPBError_ErrorType DrillPBError_ErrorType_ErrorType_MAX = DrillPBError_ErrorType_PLUGIN;
constexpr int DrillPBError_ErrorType_ErrorType_ARRAYSIZE = DrillPBError_ErrorType_ErrorType_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* DrillPBError_ErrorType_descriptor();
@@ -794,6 +795,8 @@ class DrillPBError PROTOBUF_FINAL :
DrillPBError_ErrorType_INTERNAL_ERROR;
static constexpr ErrorType UNSPECIFIED_ERROR =
DrillPBError_ErrorType_UNSPECIFIED_ERROR;
+ static constexpr ErrorType PLUGIN =
+ DrillPBError_ErrorType_PLUGIN;
static inline bool ErrorType_IsValid(int value) {
return DrillPBError_ErrorType_IsValid(value);
}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
index 2e74ad14ba..45ee0a2b32 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
@@ -63,9 +64,16 @@ public class DruidStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
- OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+ public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ OptimizerRulesContext optimizerRulesContext,
+ PlannerPhase phase
+ ) {
+ switch (phase) {
+ case PHYSICAL:
+ return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+ default:
+ return ImmutableSet.of();
+ }
}
@Override
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 10532d657b..64797a3546 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
@@ -75,8 +76,19 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of(HBasePushFilterIntoScan.FILTER_ON_SCAN, HBasePushFilterIntoScan.FILTER_ON_PROJECT);
+ public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ OptimizerRulesContext optimizerRulesContext,
+ PlannerPhase phase
+ ) {
+ switch (phase) {
+ case PHYSICAL:
+ return ImmutableSet.of(
+ HBasePushFilterIntoScan.FILTER_ON_SCAN,
+ HBasePushFilterIntoScan.FILTER_ON_PROJECT
+ );
+ default:
+ return ImmutableSet.of();
+ }
}
@Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 5f7061ed90..c021ebca14 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -42,6 +42,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
import org.apache.drill.exec.server.DrillbitContext;
@@ -189,38 +190,38 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<StoragePluginOptimizerRule> getLogicalOptimizerRules(OptimizerRulesContext optimizerContext) {
- final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
- ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
-
- ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerContext, defaultPartitionValue));
- ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerContext, defaultPartitionValue));
-
- return ruleBuilder.build();
- }
-
- @Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
- OptionManager options = optimizerRulesContext.getPlannerSettings().getOptions();
- // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
- // once "store.parquet.reader.int96_as_timestamp" will be true by default
- if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS) ||
- options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
- ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
- }
- if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)) {
- try {
- Class<?> hiveToDrillMapRDBJsonRuleClass =
- Class.forName("org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan");
- ruleBuilder.add((StoragePluginOptimizerRule) hiveToDrillMapRDBJsonRuleClass.getField("INSTANCE").get(null));
- } catch (ReflectiveOperationException e) {
- logger.warn("Current Drill build is not designed for working with Hive MapR-DB tables. " +
- "Please disable {} option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
+ public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case LOGICAL:
+ final String defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+ ImmutableSet.Builder<StoragePluginOptimizerRule> ruleBuilder = ImmutableSet.builder();
+ ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnProject(optimizerContext, defaultPartitionValue));
+ ruleBuilder.add(HivePushPartitionFilterIntoScan.getFilterOnScan(optimizerContext, defaultPartitionValue));
+ return ruleBuilder.build();
+ case PHYSICAL: {
+ ruleBuilder = ImmutableSet.builder();
+ OptionManager options = optimizerContext.getPlannerSettings().getOptions();
+ // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
+ // once "store.parquet.reader.int96_as_timestamp" will be true by default
+ if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS) ||
+ options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
+ ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
+ }
+ if (options.getBoolean(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)) {
+ try {
+ Class<?> hiveToDrillMapRDBJsonRuleClass =
+ Class.forName("org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan");
+ ruleBuilder.add((StoragePluginOptimizerRule) hiveToDrillMapRDBJsonRuleClass.getField("INSTANCE").get(null));
+ } catch (ReflectiveOperationException e) {
+ logger.warn("Current Drill build is not designed for working with Hive MapR-DB tables. " +
+ "Please disable {} option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
+ }
+ }
+ return ruleBuilder.build();
}
+ default:
+ return ImmutableSet.of();
}
- return ruleBuilder.build();
}
@Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
index 713653e743..c9e3d83307 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
@@ -33,6 +33,7 @@ import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
@@ -152,10 +153,12 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
}
List<String> names = getFullTablePath(tableName);
- SqlDialect dialect = plugin.getDialect(inner.getDataSource());
- String dropTableQuery = SqlDropTable.OPERATOR.createCall(
- SqlParserPos.ZERO, new SqlIdentifier(names, SqlParserPos.ZERO), SqlLiteral.createBoolean(false, SqlParserPos.ZERO))
- .toSqlString(dialect).getSql();
+ SqlCall dropCall = SqlDropTable.OPERATOR.createCall(
+ SqlParserPos.ZERO,
+ new SqlIdentifier(names, SqlParserPos.ZERO),
+ SqlLiteral.createBoolean(false, SqlParserPos.ZERO)
+ );
+ String dropTableQuery = dropCall.toSqlString(inner.dialect).getSql();
try (Connection conn = inner.getDataSource().getConnection();
Statement stmt = conn.createStatement()) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
index c8ef31eb77..bb4be514da 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DefaultJdbcDialect.java
@@ -47,7 +47,7 @@ public class DefaultJdbcDialect implements JdbcDialect {
return;
}
- DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+ DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials());
JdbcCatalogSchema schema = new JdbcCatalogSchema(
plugin.getName(),
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
index 09ccb57238..ecf946a380 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcConvention.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlDialect;
import org.apache.drill.exec.planner.RuleInstance;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.store.enumerable.plan.DrillJdbcRuleBase;
import org.apache.drill.exec.store.enumerable.plan.VertexDrelConverterRule;
import org.apache.drill.exec.store.jdbc.rules.JdbcLimitRule;
@@ -59,7 +60,7 @@ public class DrillJdbcConvention extends JdbcConvention {
private final ImmutableSet<RelOptRule> rules;
private final JdbcStoragePlugin plugin;
- DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, String username) {
+ DrillJdbcConvention(SqlDialect dialect, String name, JdbcStoragePlugin plugin, UserCredentials userCredentials) {
super(dialect, ConstantUntypedNull.INSTANCE, name);
this.plugin = plugin;
@@ -68,7 +69,7 @@ public class DrillJdbcConvention extends JdbcConvention {
DrillRel.DRILL_LOGICAL);
ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.<RelOptRule>builder()
- .add(new JdbcIntermediatePrelConverterRule(this, username))
+ .add(new JdbcIntermediatePrelConverterRule(this, userCredentials))
.add(VertexDrelConverterRule.create(this))
.add(RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE)
.add(RuleInstance.PROJECT_REMOVE_RULE);
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
index 3a774e0e36..af20823c9e 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcConventionFactory.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.store.jdbc;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
@@ -31,7 +33,7 @@ public class JdbcConventionFactory {
public static final int CACHE_SIZE = 100;
public static final Duration CACHE_TTL = Duration.ofHours(1);
- private final Cache<SqlDialect, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
+ private final Cache<Pair<SqlDialect, UserCredentials>, DrillJdbcConvention> cache = CacheBuilder.newBuilder()
.maximumSize(CACHE_SIZE)
.expireAfterAccess(CACHE_TTL)
.build();
@@ -39,12 +41,12 @@ public class JdbcConventionFactory {
public DrillJdbcConvention getJdbcConvention(
JdbcStoragePlugin plugin,
SqlDialect dialect,
- String username) {
+ UserCredentials userCredentials) {
try {
- return cache.get(dialect, new Callable<DrillJdbcConvention>() {
+ return cache.get(Pair.of(dialect, userCredentials), new Callable<DrillJdbcConvention>() {
@Override
public DrillJdbcConvention call() {
- return new DrillJdbcConvention(dialect, plugin.getName(), plugin, username);
+ return new DrillJdbcConvention(dialect, plugin.getName(), plugin, userCredentials);
}
});
} catch (ExecutionException ex) {
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
index dfc2073f4f..b3eaef16ea 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDialectFactory.java
@@ -18,38 +18,33 @@
package org.apache.drill.exec.store.jdbc;
import org.apache.calcite.sql.SqlDialect;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.jdbc.clickhouse.ClickhouseJdbcDialect;
-import org.apache.drill.shaded.guava.com.google.common.cache.Cache;
-import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import java.time.Duration;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
public class JdbcDialectFactory {
public static final String JDBC_CLICKHOUSE_PREFIX = "jdbc:clickhouse";
public static final int CACHE_SIZE = 100;
public static final Duration CACHE_TTL = Duration.ofHours(1);
-
- private final Cache<SqlDialect, JdbcDialect> cache = CacheBuilder.newBuilder()
- .maximumSize(CACHE_SIZE)
- .expireAfterAccess(CACHE_TTL)
- .build();
+ private volatile JdbcDialect jdbcDialect;
public JdbcDialect getJdbcDialect(JdbcStoragePlugin plugin, SqlDialect dialect) {
- try {
- return cache.get(dialect, new Callable<JdbcDialect>() {
- @Override
- public JdbcDialect call() {
- return plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
- ? new ClickhouseJdbcDialect(plugin, dialect)
- : new DefaultJdbcDialect(plugin, dialect);
+ // Note: any given JdbcDialectFactory instance will only ever be called with
+ // a single SqlDialect so we can cache using a single member.
+ JdbcDialect jd = jdbcDialect;
+ if (jd == null) {
+ // Double checked locking using a volatile member and a local var
+ // optimisation to reduce volatile accesses.
+ synchronized (this) {
+ jd = jdbcDialect;
+ if (jd == null) {
+ jd = plugin.getConfig().getUrl().startsWith(JDBC_CLICKHOUSE_PREFIX)
+ ? new ClickhouseJdbcDialect(plugin, dialect)
+ : new DefaultJdbcDialect(plugin, dialect);
+ jdbcDialect = jd;
}
- });
- } catch (ExecutionException ex) {
- throw new DrillRuntimeException("Cannot load the requested JdbcDialect", ex);
+ }
}
+ return jd;
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
index 1c86e6447d..5dc5d7c2de 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcInsertWriterBatchCreator.java
@@ -49,7 +49,7 @@ public class JdbcInsertWriterBatchCreator implements BatchCreator<JdbcInsertWrit
config,
children.iterator().next(),
context,
- new JdbcTableModifyWriter(ds, config.getTableIdentifier(), config)
+ new JdbcTableModifyWriter(userCreds, config.getTableIdentifier(), config)
);
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
index f21b986396..7cea799b17 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.SinglePrel;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
/**
@@ -35,11 +36,11 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
* before execution can happen.
*/
public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
- private final String username;
+ private final UserCredentials userCredentials;
- public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, String username) {
+ public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, UserCredentials userCredentials) {
super(cluster, traits, child);
- this.username = username;
+ this.userCredentials = userCredentials;
}
@Override
@@ -49,7 +50,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), username);
+ return new JdbcIntermediatePrel(getCluster(), traitSet, getInput(), userCredentials);
}
@Override
@@ -64,7 +65,7 @@ public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable
@Override
public Prel finalizeRel() {
- return new JdbcPrel(getCluster(), getTraitSet(), this, username);
+ return new JdbcPrel(getCluster(), getTraitSet(), this, userCredentials);
}
@Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index 62250b4662..57b986667b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -27,15 +27,16 @@ import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
class JdbcIntermediatePrelConverterRule extends RelOptRule {
private final RelTrait inTrait;
private final RelTrait outTrait;
- private final String username;
+ private final UserCredentials userCredentials;
- public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, String username) {
+ public JdbcIntermediatePrelConverterRule(JdbcConvention jdbcConvention, UserCredentials userCredentials) {
super(
RelOptHelper.some(VertexDrel.class, DrillRel.DRILL_LOGICAL,
RelOptHelper.any(RelNode.class, jdbcConvention)),
@@ -43,7 +44,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
this.inTrait = DrillRel.DRILL_LOGICAL;
this.outTrait = Prel.DRILL_PHYSICAL;
- this.username = username;
+ this.userCredentials = userCredentials;
}
@Override
@@ -52,7 +53,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
in.getCluster(),
in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
- in.getInput(0), username);
+ in.getInput(0), userCredentials);
call.transformTo(jdbcIntermediatePrel);
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index b126d948a3..00753c8971 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import java.util.ArrayList;
@@ -43,12 +44,12 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
private final String sql;
private final double rows;
private final DrillJdbcConvention convention;
- private final String username;
+ private final UserCredentials userCredentials;
- public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, String username) {
+ public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel, UserCredentials userCredentials) {
super(cluster, traitSet);
final RelNode input = prel.getInput();
- this.username = username;
+ this.userCredentials = userCredentials;
rows = input.estimateRowCount(cluster.getMetadataQuery());
convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
JdbcDialect jdbcDialect = convention.getPlugin().getJdbcDialect(convention.dialect);
@@ -74,7 +75,7 @@ public class JdbcPrel extends AbstractRelNode implements Prel {
for (String col : rowType.getFieldNames()) {
columns.add(SchemaPath.getSimplePath(col));
}
- JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, username);
+ JdbcGroupScan output = new JdbcGroupScan(sql, columns, convention.getPlugin(), rows, userCredentials.getUserName());
return creator.addMetadata(this, output);
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
index ebe907392a..7f767cf21b 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
@@ -39,7 +40,6 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
@@ -60,15 +60,15 @@ public class JdbcRecordWriter extends AbstractRecordWriter {
private final JdbcWriter config;
private int recordCount;
- public JdbcRecordWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
+ public JdbcRecordWriter(UserCredentials userCredentials, List<String> tableIdentifier, JdbcWriter config) {
this.tableIdentifier = tableIdentifier;
- this.dialect = config.getPlugin().getDialect(source);
+ this.dialect = config.getPlugin().getDialect(userCredentials);
this.config = config;
this.recordCount = 0;
this.insertStatementBuilder = getInsertStatementBuilder(tableIdentifier);
try {
- this.connection = source.getConnection();
+ this.connection = config.getPlugin().getDataSource(userCredentials).get().getConnection();
} catch (SQLException e) {
throw UserException.connectionError()
.message("Unable to open JDBC connection for writing.")
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 8489087465..c968d518ec 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -56,6 +56,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
private final JdbcStorageConfig jdbcStorageConfig;
private final JdbcDialectFactory dialectFactory;
private final JdbcConventionFactory conventionFactory;
+ private volatile SqlDialect sqlDialect;
// DataSources for this storage config keyed on JDBC username
private final Map<String, HikariDataSource> dataSources = new ConcurrentHashMap<>();
@@ -79,7 +80,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
return;
}
- SqlDialect dialect = getDialect(dataSource.get());
+ SqlDialect dialect = getDialect(userCreds);
getJdbcDialect(dialect).registerSchemas(config, parent);
}
@@ -108,19 +109,31 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
));
}
- public SqlDialect getDialect(DataSource dataSource) {
- return JdbcSchema.createDialect(
- SqlDialectFactoryImpl.INSTANCE,
- dataSource
- );
+ public SqlDialect getDialect(UserCredentials userCredentials) {
+ SqlDialect sd = sqlDialect;
+ if (sd == null) {
+ // Double checked locking using a volatile member and a local var
+ // optimisation to reduce volatile accesses.
+ synchronized (this) {
+ sd = sqlDialect;
+ if (sd == null) {
+ sd = JdbcSchema.createDialect(
+ SqlDialectFactoryImpl.INSTANCE,
+ getDataSource(userCredentials).get()
+ );
+ sqlDialect = sd;
+ }
+ }
+ }
+ return sd;
}
public JdbcDialect getJdbcDialect(SqlDialect dialect) {
return dialectFactory.getJdbcDialect(this, dialect);
}
- public DrillJdbcConvention getConvention(SqlDialect dialect, String username) {
- return conventionFactory.getJdbcConvention(this, dialect, username);
+ public DrillJdbcConvention getConvention(SqlDialect dialect, UserCredentials userCredentials) {
+ return conventionFactory.getJdbcConvention(this, dialect, userCredentials);
}
@Override
@@ -151,9 +164,8 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
case PHYSICAL: {
UserCredentials userCreds = optimizerContext.getContextInformation().getQueryUserCredentials();
- String userName = userCreds.getUserName();
return getDataSource(userCreds)
- .map(dataSource -> getConvention(getDialect(dataSource), userName).getRules())
+ .map(dataSource -> getConvention(getDialect(userCreds), userCreds).getRules())
.orElse(ImmutableSet.of());
}
case LOGICAL_PRUNE_AND_JOIN:
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
index 3895a087cd..4cd84b2499 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcTableModifyWriter.java
@@ -17,15 +17,15 @@
*/
package org.apache.drill.exec.store.jdbc;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.record.VectorAccessible;
-import javax.sql.DataSource;
import java.util.List;
public class JdbcTableModifyWriter extends JdbcRecordWriter {
- public JdbcTableModifyWriter(DataSource source, List<String> tableIdentifier, JdbcWriter config) {
- super(source, tableIdentifier, config);
+ public JdbcTableModifyWriter(UserCredentials userCredentials, List<String> tableIdentifier, JdbcWriter config) {
+ super(userCredentials, tableIdentifier, config);
}
@Override
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
index 4373ba6303..0e00e471fb 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcWriterBatchCreator.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.jdbc;
import java.util.List;
-import javax.sql.DataSource;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -38,18 +36,12 @@ public class JdbcWriterBatchCreator implements BatchCreator<JdbcWriter> {
assert children != null && children.size() == 1;
UserCredentials userCreds = context.getContextInformation().getQueryUserCredentials();
- DataSource ds = config.getPlugin().getDataSource(userCreds)
- .orElseThrow(() -> new ExecutionSetupException(String.format(
- "Query user %s could obtain a connection to %s, missing credentials?",
- userCreds.getUserName(),
- config.getPlugin().getName()
- )));
return new WriterRecordBatch(
config,
children.iterator().next(),
context,
- new JdbcRecordWriter(ds, config.getTableIdentifier(), config)
+ new JdbcRecordWriter(userCreds, config.getTableIdentifier(), config)
);
}
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
index 59816f30c1..d1a18a6c26 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/clickhouse/ClickhouseJdbcDialect.java
@@ -51,7 +51,7 @@ public class ClickhouseJdbcDialect implements JdbcDialect {
if (!dataSource.isPresent()) {
return;
}
- DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials().getUserName());
+ DrillJdbcConvention convention = plugin.getConvention(dialect, config.getQueryUserCredentials());
ClickhouseCatalogSchema schema = new ClickhouseCatalogSchema(
plugin.getName(),
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index d80b959a04..7fb53ce3a3 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -24,6 +24,7 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
@@ -67,8 +68,16 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
+ public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ OptimizerRulesContext optimizerRulesContext,
+ PlannerPhase phase
+ ) {
+ switch (phase) {
+ case PHYSICAL:
+ return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
+ default:
+ return ImmutableSet.of();
+ }
}
@Override
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
index 542099565a..6beb6b3711 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
@@ -47,6 +48,7 @@ import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tephra.shaded.com.google.common.collect.ImmutableSet;
public class PhoenixStoragePlugin extends AbstractStoragePlugin {
@@ -88,8 +90,16 @@ public class PhoenixStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<? extends RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return convention.getRules();
+ public Set<? extends RelOptRule> getOptimizerRules(
+ OptimizerRulesContext optimizerRulesContext,
+ PlannerPhase phase
+ ) {
+ switch (phase) {
+ case PHYSICAL:
+ return convention.getRules();
+ default:
+ return ImmutableSet.of();
+ }
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
index 1f8b16262b..380f0d52a2 100644
--- a/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/calcite/jdbc/DynamicRootSchema.java
@@ -26,6 +26,7 @@ import org.apache.calcite.util.BuiltInMethod;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserExceptionUtils;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.alias.AliasRegistryProvider;
import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.store.AbstractSchema;
@@ -37,7 +38,6 @@ import org.apache.drill.exec.store.SubSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -91,22 +91,68 @@ public class DynamicRootSchema extends DynamicSchema {
public SchemaPath resolveTableAlias(String alias) {
return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
- .getUserAliases(schemaConfig.getUserName()).get(alias))
+ .getUserAliases(schemaConfig.getUserName()).get(alias))
.map(SchemaPath::parseFromString)
.orElse(null);
}
+ private void registerSchemasWithRetry(StoragePlugin plugin) throws Exception {
+ long maxAttempts = 1 + schemaConfig
+ .getOption(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS)
+ .num_val;
+ long retryDelayMs = schemaConfig
+ .getOption(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY)
+ .num_val;
+ int attempt=0;
+ Exception lastAttemptEx = null;
+
+ while (attempt++ < maxAttempts) {
+ try {
+ plugin.registerSchemas(schemaConfig, plus());
+ return;
+ } catch (Exception ex) {
+ lastAttemptEx = ex;
+ logger.warn(
+ "Attempt {} of {} to register schemas for plugin {} failed.",
+ attempt, maxAttempts, plugin,
+ ex
+ );
+
+ if (attempt < maxAttempts) {
+ logger.info(
+ "Next attempt to register schemas for plugin {} will be made in {}ms.",
+ plugin,
+ retryDelayMs
+ );
+ try {
+ Thread.sleep(retryDelayMs);
+ } catch (InterruptedException intEx) {
+ logger.warn(
+ "Interrupted while waiting to make another attempt to register " +
+ "schemas for plugin {}.",
+ plugin,
+ intEx
+ );
+ }
+ }
+ }
+ }
+
+ throw lastAttemptEx;
+ }
+
/**
* Loads schema factory(storage plugin) for specified {@code schemaName}
* @param schemaName the name of the schema
* @param caseSensitive whether matching for the schema name is case sensitive
*/
private void loadSchemaFactory(String schemaName, boolean caseSensitive) {
+ StoragePlugin plugin = null;
try {
SchemaPlus schemaPlus = this.plus();
- StoragePlugin plugin = storages.getPlugin(schemaName);
+ plugin = storages.getPlugin(schemaName);
if (plugin != null) {
- plugin.registerSchemas(schemaConfig, schemaPlus);
+ registerSchemasWithRetry(plugin);
return;
}
@@ -122,7 +168,7 @@ public class DynamicRootSchema extends DynamicSchema {
SchemaPlus firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
if (firstLevelSchema == null) {
// register schema for this storage plugin to 'this'.
- plugin.registerSchemas(schemaConfig, schemaPlus);
+ registerSchemasWithRetry(plugin);
firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
}
// Load second level schemas for this storage plugin
@@ -142,15 +188,32 @@ public class DynamicRootSchema extends DynamicSchema {
schemaPlus.add(wrapper.getName(), wrapper);
}
}
- } catch(PluginException | IOException ex) {
- logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
+ } catch (Exception ex) {
+ logger.error("Failed to load schema for {}", schemaName, ex);
// We can't proceed further without a schema, throw a runtime exception.
UserException.Builder exceptBuilder =
UserException
- .resourceError(ex)
- .message("Failed to load schema for \"" + schemaName + "\"!")
- .addContext(ex.getClass().getName() + ": " + ex.getMessage())
+ .pluginError(ex)
+ .message("Failed to load schema for schema %s", schemaName)
+ .addContext("%s: %s", ex.getClass().getName(), ex.getMessage())
.addContext(UserExceptionUtils.getUserHint(ex)); //Provide hint if it exists
+
+ if (schemaConfig.getOption(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE).bool_val) {
+ String msg = String.format(
+ "The plugin %s will now be disabled (see SYSTEM option %s)",
+ plugin.getName(),
+ ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE
+ );
+ exceptBuilder.addContext(msg);
+ logger.warn(msg);
+
+ try {
+ storages.setEnabled(plugin.getName(), false);
+ } catch (PluginException disableEx) {
+ logger.error("Could not disable {}", plugin.getName(), disableEx);
+ }
+ }
+
throw exceptBuilder.build(logger);
}
}
@@ -187,4 +250,3 @@ public class DynamicRootSchema extends DynamicSchema {
}
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 695f8b6b02..ded472e32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValid
import org.apache.drill.exec.server.options.TypeValidators.IntegerValidator;
import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
import org.apache.drill.exec.server.options.TypeValidators.MaxWidthValidator;
+import org.apache.drill.exec.server.options.TypeValidators.NonNegativeLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
@@ -1094,6 +1095,36 @@ public final class ExecConstants {
new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. " +
"Default is false. (Drill 1.15+)"));
+ public static final String STORAGE_PLUGIN_RETRY_ATTEMPTS = "storage.plugin_retry_attempts";
+ public static final LongValidator STORAGE_PLUGIN_RETRY_ATTEMPTS_VALIDATOR = new NonNegativeLongValidator(
+ STORAGE_PLUGIN_RETRY_ATTEMPTS,
+ 10,
+ new OptionDescription(
+ "The maximum number of retries that will be attempted to request metadata " +
+ "for query planning from a storage plugin."
+ )
+ );
+ public static final String STORAGE_PLUGIN_RETRY_DELAY = "storage.plugin_retry_attempt_delay";
+ public static final LongValidator STORAGE_PLUGIN_RETRY_DELAY_VALIDATOR = new NonNegativeLongValidator(
+ STORAGE_PLUGIN_RETRY_DELAY,
+ 5 * 1000,
+ new OptionDescription(String.format(
+ "The delay in milliseconds between repeated attempts to request metadata " +
+ "for query planning from a storage plugin (see %s).",
+ STORAGE_PLUGIN_RETRY_ATTEMPTS
+ ))
+ );
+ public static final String STORAGE_PLUGIN_AUTO_DISABLE = "storage.plugin_auto_disable";
+ public static final BooleanValidator STORAGE_PLUGIN_AUTO_DISABLE_VALIDATOR = new BooleanValidator(
+ STORAGE_PLUGIN_AUTO_DISABLE,
+ new OptionDescription(String.format(
+ "Controls whether a storage plugin will automatically be disabled after " +
+ "the configured number of attempts to request metadata for query " +
+ " planning from it have failed (see %s)",
+ STORAGE_PLUGIN_RETRY_ATTEMPTS
+ ))
+ );
+
public static final String RETURN_RESULT_SET_FOR_DDL = "exec.query.return_result_set_for_ddl";
public static final BooleanValidator RETURN_RESULT_SET_FOR_DDL_VALIDATOR = new BooleanValidator(RETURN_RESULT_SET_FOR_DDL,
new OptionDescription("Controls whether to return result set for CREATE TABLE / VIEW / FUNCTION, DROP TABLE / VIEW / FUNCTION, " +
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index a0f8024fea..00ff1fc704 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -93,7 +93,6 @@ import org.apache.drill.exec.planner.physical.UnnestPrule;
import org.apache.drill.exec.planner.physical.ValuesPrule;
import org.apache.drill.exec.planner.physical.WindowPrule;
import org.apache.drill.exec.planner.physical.WriterPrule;
-import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.parquet.FilePushDownFilter;
@@ -250,16 +249,11 @@ public enum PlannerPhase {
public abstract RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins);
- @SuppressWarnings("deprecation")
private static RuleSet getStorageRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins,
PlannerPhase phase) {
final Builder<RelOptRule> rules = ImmutableSet.builder();
- for (StoragePlugin plugin : plugins) {
- if (plugin instanceof AbstractStoragePlugin) {
- rules.addAll(((AbstractStoragePlugin) plugin).getOptimizerRules(context, phase));
- } else {
- rules.addAll(plugin.getOptimizerRules(context));
- }
+ for (StoragePlugin sp : plugins) {
+ rules.addAll(sp.getOptimizerRules(context, phase));
}
return RuleSets.ofList(rules.build());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 1d75e6c0aa..96155158c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -53,6 +53,7 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
@@ -127,19 +128,34 @@ public class DrillSqlWorker {
try {
return getPhysicalPlan(context, sql, textPlan, retryAttempts);
} catch (Exception e) {
- logger.trace("There was an error during conversion into physical plan. " +
- "Will sync remote and local function registries if needed and retry " +
- "in case if issue was due to missing function implementation.", e);
- // it is prohibited to retry query planning for ANALYZE statement since it changes
+ logger.trace("There was an error during conversion into physical plan.", e);
+
+ // It is prohibited to retry query planning for ANALYZE statement since it changes
// query-level option values and will fail when rerunning with updated values
- if (context.getFunctionRegistry().syncWithRemoteRegistry(
- context.getDrillOperatorTable().getFunctionRegistryVersion())
- && context.getSQLStatementType() != SqlStatementType.ANALYZE) {
- context.reloadDrillOperatorTable();
- logger.trace("Local function registry was synchronized with remote. Trying to find function one more time.");
- return getPhysicalPlan(context, sql, textPlanCopy, retryAttempts);
+ boolean syncFuncsAndRetry = context.getSQLStatementType() != SqlStatementType.ANALYZE;
+
+ // If an error has occurred in a plugin then it will not help to look for a missing
+ // function again.
+ syncFuncsAndRetry &= !(e instanceof UserException &&
+ ((UserException) e).getErrorType() == DrillPBError.ErrorType.PLUGIN);
+
+ // Attempt a function registry sync
+ logger.trace("Will sync remote and local function registries if needed.");
+ int funcRegistryVer = context.getDrillOperatorTable().getFunctionRegistryVersion();
+ syncFuncsAndRetry &= context.getFunctionRegistry().syncWithRemoteRegistry(funcRegistryVer);
+
+ if (!syncFuncsAndRetry) {
+ // We don't want to or could not sync the function registry, return to
+ // raising the original error.
+ throw e;
}
- throw e;
+
+ context.reloadDrillOperatorTable();
+ logger.trace(
+ "Local function registry was synchronized with remote. " +
+ "Trying to find function one more time."
+ );
+ return getPhysicalPlan(context, sql, textPlanCopy, retryAttempts);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3e1f6a73ec..e3421b4dab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -320,7 +320,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.METASTORE_RETRIEVAL_RETRY_ATTEMPTS_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_ENABLE_MAP_SUPPORT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, false, false)),
new OptionDefinition(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC_VALIDATOR),
- new OptionDefinition(ExecConstants.ENABLE_ALIASES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
+ new OptionDefinition(ExecConstants.ENABLE_ALIASES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+ new OptionDefinition(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+ new OptionDefinition(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+ new OptionDefinition(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
};
CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 2015002f54..4613693011 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -19,18 +19,13 @@ package org.apache.drill.exec.store;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
-import org.apache.calcite.plan.RelOptRule;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.metastore.MetadataProviderManager;
-import org.apache.drill.exec.planner.PlannerPhase;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.dfs.FormatPlugin;
@@ -64,54 +59,6 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
return false;
}
- /**
- * @deprecated Marking for deprecation in next major version release. Use
- * {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
- */
- @Override
- @Deprecated
- public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext) {
- return ImmutableSet.of();
- }
-
- /**
- * @deprecated Marking for deprecation in next major version release. Use
- * {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
- */
- @Deprecated
- public Set<? extends RelOptRule> getLogicalOptimizerRules(OptimizerRulesContext optimizerContext) {
- return ImmutableSet.of();
- }
-
- /**
- * @deprecated Marking for deprecation in next major version release. Use
- * {@link #getOptimizerRules(org.apache.drill.exec.ops.OptimizerRulesContext, org.apache.drill.exec.planner.PlannerPhase)}
- */
- @Deprecated
- public Set<? extends RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- // To be backward compatible, by default call the getOptimizerRules() method.
- return getOptimizerRules(optimizerRulesContext);
- }
-
- /**
- *
- * TODO: Move this method to {@link StoragePlugin} interface in next major version release.
- */
- public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
- switch (phase) {
- case LOGICAL_PRUNE_AND_JOIN:
- case LOGICAL_PRUNE:
- case PARTITION_PRUNING:
- return getLogicalOptimizerRules(optimizerContext);
- case PHYSICAL:
- return getPhysicalOptimizerRules(optimizerContext);
- case LOGICAL:
- case JOIN_PLANNING:
- default:
- return ImmutableSet.of();
- }
- }
-
@Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
return getPhysicalScan(userName, selection);
@@ -162,4 +109,8 @@ public abstract class AbstractStoragePlugin implements StoragePlugin {
return context;
}
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index db4ea3a659..96b75f644c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -28,9 +28,11 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
/** Interface for all implementations of the storage plugins. Different implementations of the storage
* formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -72,8 +74,9 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
* optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set.
* @return an empty set or a set of plugin specific physical optimizer rules.
*/
- @Deprecated
- Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext);
+ public default Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ return ImmutableSet.of();
+ }
/**
* Get the physical scan operator for the particular GroupScan (read) node.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
index d0b4617f5a..6ec025712b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/base/filter/FilterPushDownStrategy.java
@@ -52,10 +52,14 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
* for a particular scan.
* <p>
* General usage in a storage plugin: <code><pre>
- * public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
- * OptimizerRulesContext optimizerRulesContext) {
- * return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
- * new MyPushDownListener(...));
+ * public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ * OptimizerRulesContext optimizerRulesContext, PlannerPhase phase) {
+ * switch (phase) {
+ * case PHYSICAL:
+ * return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
+ * new MyPushDownListener(...));
+ * ...
+ * }
* }
* </pre></code>
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index be56e11a1a..da2560edbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -126,9 +127,18 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
- return ImmutableSet.of(
- InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_PROJECT,
- InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_SCAN);
+ public Set<StoragePluginOptimizerRule> getOptimizerRules(
+ OptimizerRulesContext optimizerRulesContext,
+ PlannerPhase phase
+ ) {
+ switch (phase) {
+ case PHYSICAL:
+ return ImmutableSet.of(
+ InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_PROJECT,
+ InfoSchemaPushFilterIntoRecordGenerator.IS_FILTER_ON_SCAN
+ );
+ default:
+ return ImmutableSet.of();
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
index 488f3fb35a..d0b13de2c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
public class FileSystemUtil {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemUtil.class);
- public static final String RECURSIVE_FILE_LISTING_MAX_SIZE = "drill.exec.recursive_file_listing_max_size";
+ public static final String RECURSIVE_LISTING_PROP_NAME = "drill.exec.recursive_file_listing_max_size";
/**
* Filter that will accept all files and directories.
@@ -253,7 +253,7 @@ public class FileSystemUtil {
private static List<FileStatus> listRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) {
ForkJoinPool pool = new ForkJoinPool();
AtomicInteger fileCounter = new AtomicInteger(0);
- int recursiveListingMaxSize = fs.getConf().getInt(RECURSIVE_FILE_LISTING_MAX_SIZE, 0);
+ int recursiveListingMaxSize = fs.getConf().getInt(RECURSIVE_LISTING_PROP_NAME, 0);
try {
RecursiveListing task = new RecursiveListing(
@@ -306,7 +306,7 @@ public class FileSystemUtil {
private final Scope scope;
private final boolean suppressExceptions;
private final PathFilter filter;
- // Running count of files for comparison with RECURSIVE_FILE_LISTING_MAX_SIZE
+ // Running count of files for comparison with recursiveListingMaxSize
private final AtomicInteger fileCounter;
private final int recursiveListingMaxSize;
private final ForkJoinPool pool;
@@ -343,10 +343,11 @@ public class FileSystemUtil {
throw UserException
.resourceError()
.message(
- "File listing size limit of %d exceeded recursing through path %s, see JVM system property %s",
+ "File listing size limit of %d exceeded recursing through " +
+ "path %s, see Hadoop config property %s",
recursiveListingMaxSize,
path,
- RECURSIVE_FILE_LISTING_MAX_SIZE
+ RECURSIVE_LISTING_PROP_NAME
)
.build(logger);
} finally {
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 765b8d2fe1..a1dbdeb210 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -755,6 +755,9 @@ drill.exec.options: {
web.display_format.time: "",
window.enable: true,
storage.list_files_recursively: false,
+ storage.plugin_retry_attempts: 1,
+ storage.plugin_retry_attempt_delay: 2000,
+ storage.plugin_auto_disable: true,
# ============ index plan related options ==============
planner.use_simple_optimizer: false,
planner.enable_index_planning: true,
@@ -773,7 +776,6 @@ drill.exec.options: {
exec.query.return_result_set_for_ddl: true,
exec.query.max_rows: 0,
exec.return_result_set_for_ddl: true,
- storage.list_files_recursively: false,
exec.statistics.ndv_accuracy: 20,
exec.statistics.ndv_extrapolation_bf_elements: 1000000,
exec.statistics.ndv_extrapolation_bf_fpprobability: 10,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
index a7d27ad0e4..4a78e6436c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSchema.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.mock.MockBreakageStorage;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
@@ -28,6 +30,8 @@ import org.junit.ClassRule;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class TestSchema extends DrillTest {
@@ -41,21 +45,66 @@ public class TestSchema extends DrillTest {
public static void setup() throws Exception {
cluster = ClusterFixture.builder(dirTestWatcher).buildCustomMockStorage();
boolean breakRegisterSchema = true;
- // With a broken storage which will throw exception in registerSchema, every query (even on other storage)
- // shall fail if Drill is still loading all schemas (include the broken schema) before a query.
cluster.insertMockStorage("mock_broken", breakRegisterSchema);
cluster.insertMockStorage("mock_good", !breakRegisterSchema);
client = cluster.clientFixture();
}
@Test (expected = Exception.class)
- public void testQueryBrokenStorage() throws Exception {
+ public void testQueryBrokenRegSchema() throws Exception {
+ MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
String sql = "SELECT id_i, name_s10 FROM `mock_broken`.`employees_5`";
+
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, false);
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 2);
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY, 0);
+
+ mbs.setBreakRegister(true);
try {
client.queryBuilder().sql(sql).run();
} catch (Exception ex) {
- assertTrue(ex.getMessage().contains("RESOURCE ERROR: Failed to load schema"));
+ assertTrue(
+ ex.getMessage()
+ .split(System.lineSeparator())[0]
+ .matches("^RESOURCE ERROR: Failed to load schema.*")
+ );
+
+ assertEquals(3, mbs.registerAttemptCount);
+ // The plugin should still be enabled because we set auto_disable to false.
+ assertTrue(cluster.storageRegistry().availablePlugins().contains("mock_broken"));
+ throw ex;
+ } finally {
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY);
+ }
+ }
+
+ @Test (expected = Exception.class)
+ public void testAutoDisableBrokenRegSchema() throws Exception {
+ MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
+ String sql = "SELECT id_i, name_s10 FROM `mock_broken`.`employees_5`";
+
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, true);
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 0);
+
+ mbs.setBreakRegister(true);
+ try {
+ client.queryBuilder().sql(sql).run();
+ } catch (Exception ex) {
+ assertTrue(
+ ex.getMessage()
+ .split(System.lineSeparator())[0]
+ .matches("^PLUGIN ERROR: Failed to load schema.*")
+ );
+
+ // The plugin should no longer be enabled because we set auto_disable to false.
+ assertFalse(cluster.storageRegistry().availablePlugins().contains("mock_broken"));
throw ex;
+ } finally {
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
+ cluster.storageRegistry().setEnabled("mock_broken", true);
}
}
@@ -75,12 +124,20 @@ public class TestSchema extends DrillTest {
@Test (expected = Exception.class)
public void testUseBrokenStorage() throws Exception {
+ MockBreakageStorage mbs = (MockBreakageStorage) cluster.storageRegistry().getPlugin("mock_broken");
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE, false);
+ client.alterSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS, 0);
+
+ mbs.setBreakRegister(true);
try {
String use_dfs = "use mock_broken";
client.queryBuilder().sql(use_dfs).run();
} catch(Exception ex) {
assertTrue(ex.getMessage().contains("RESOURCE ERROR: Failed to load schema"));
throw ex;
+ } finally {
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE);
+ client.resetSystem(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
index 48ee441ab6..269dbbcfa6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
@@ -206,15 +206,15 @@ public class FileSystemUtilTest extends FileSystemUtilTestBase {
@Test // DRILL-8283
public void testRecursiveListingMaxSize() throws IOException {
Configuration conf = fs.getConf();
- int oldSize = conf.getInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, 0);
- conf.setInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, 5);
+ int oldSize = conf.getInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, 0);
+ conf.setInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, 5);
try {
FileSystemUtil.listAll(fs, new Path(base, "a"), true);
} catch (UserException ex) {
assertThat(ex.getMessage(), containsString("RESOURCE ERROR: File listing size limit"));
} finally {
- conf.setInt(FileSystemUtil.RECURSIVE_FILE_LISTING_MAX_SIZE, oldSize);
+ conf.setInt(FileSystemUtil.RECURSIVE_LISTING_PROP_NAME, oldSize);
}
}
}
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 5071b5d793..622490b67f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2046,6 +2046,16 @@ public final class UserBitShared {
* <code>UNSPECIFIED_ERROR = 13;</code>
*/
UNSPECIFIED_ERROR(13),
+ /**
+ * <pre>
+ * Plugin exception
+ * - A failure has occurred within a plugin.
+ * Indicates that a plugin is misconfigured or contains a bug.
+ * </pre>
+ *
+ * <code>PLUGIN = 14;</code>
+ */
+ PLUGIN(14),
;
/**
@@ -2188,6 +2198,16 @@ public final class UserBitShared {
* <code>UNSPECIFIED_ERROR = 13;</code>
*/
public static final int UNSPECIFIED_ERROR_VALUE = 13;
+ /**
+ * <pre>
+ * Plugin exception
+ * - A failure has occurred within a plugin.
+ * Indicates that a plugin is misconfigured or contains a bug.
+ * </pre>
+ *
+ * <code>PLUGIN = 14;</code>
+ */
+ public static final int PLUGIN_VALUE = 14;
public final int getNumber() {
@@ -2224,6 +2244,7 @@ public final class UserBitShared {
case 11: return EXECUTION_ERROR;
case 12: return INTERNAL_ERROR;
case 13: return UNSPECIFIED_ERROR;
+ case 14: return PLUGIN;
default: return null;
}
}
@@ -28533,113 +28554,114 @@ public final class UserBitShared {
"s.proto\032\022Coordination.proto\032\017SchemaDef.p" +
"roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001" +
"(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 " +
- "\001(\020\"\355\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
+ "\001(\020\"\371\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
"\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint" +
"\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill" +
"PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex" +
"ception\030\005 \001(\0132\035.exec.shared.ExceptionWra" +
"pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share" +
- "d.ParsingError\"\362\001\n\tErrorType\022\016\n\nCONNECTI" +
+ "d.ParsingError\"\376\001\n\tErrorType\022\016\n\nCONNECTI" +
"ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F" +
"UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004" +
"PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU" +
"PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\022\023\n\017E" +
"XECUTION_ERROR\020\013\022\022\n\016INTERNAL_ERROR\020\014\022\025\n\021" +
- "UNSPECIFIED_ERROR\020\r\"\246\001\n\020ExceptionWrapper" +
- "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001" +
- "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St" +
- "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035." +
- "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr" +
- "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n" +
- "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n" +
- "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005" +
- " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001" +
- "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(" +
- "\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016RecordBatchDef\022\024\n" +
- "\014record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exe" +
- "c.shared.SerializedField\022)\n!carries_two_" +
- "byte_selection_vector\030\003 \001(\010\022\033\n\023affected_" +
- "rows_count\030\004 \001(\005\"\205\001\n\010NamePart\022(\n\004type\030\001 " +
- "\001(\0162\032.exec.shared.NamePart.Type\022\014\n\004name\030" +
- "\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.shared.NameP" +
- "art\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY\020\001\"\324\001\n\017Ser" +
- "ializedField\022%\n\nmajor_type\030\001 \001(\0132\021.commo" +
- "n.MajorType\022(\n\tname_part\030\002 \001(\0132\025.exec.sh" +
- "ared.NamePart\022+\n\005child\030\003 \003(\0132\034.exec.shar" +
- "ed.SerializedField\022\023\n\013value_count\030\004 \001(\005\022" +
- "\027\n\017var_byte_length\030\005 \001(\005\022\025\n\rbuffer_lengt" +
- "h\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_id\030\001 \001(\005\022\030" +
- "\n\020memory_footprint\030\002 \001(\003\"\263\002\n\013QueryResult" +
- "\0228\n\013query_state\030\001 \001(\0162#.exec.shared.Quer" +
- "yResult.QueryState\022&\n\010query_id\030\002 \001(\0132\024.e" +
- "xec.shared.QueryId\022(\n\005error\030\003 \003(\0132\031.exec" +
- ".shared.DrillPBError\"\227\001\n\nQueryState\022\014\n\010S" +
- "TARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n" +
- "\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CANCELLATION_" +
- "REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007" +
- "\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData\022&\n\010query_id" +
- "\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n\trow_coun" +
- "t\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.Recor" +
- "dBatchDef\022\033\n\023affected_rows_count\030\004 \001(\005\"\330" +
- "\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001" +
- "(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.QueryRes" +
- "ult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007forem" +
- "an\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014opti" +
- "ons_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqu" +
- "eue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryProfile\022 \n\002id" +
- "\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001" +
- "(\0162\026.exec.shared.QueryType\022\r\n\005start\030\003 \001(" +
- "\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 " +
- "\001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndp" +
- "oint\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryR" +
- "esult.QueryState\022\027\n\017total_fragments\030\t \001(" +
- "\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020fragmen" +
- "t_profile\030\013 \003(\0132!.exec.shared.MajorFragm" +
- "entProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001" +
- "(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001" +
- "(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021" +
- " \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 " +
- "\001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 " +
- "\001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\tautoLimit\030\027 \001" +
- "(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"t\n\024MajorFrag" +
- "mentProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022A" +
- "\n\026minor_fragment_profile\030\002 \003(\0132!.exec.sh" +
- "ared.MinorFragmentProfile\"\350\002\n\024MinorFragm" +
- "entProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared." +
- "FragmentState\022(\n\005error\030\002 \001(\0132\031.exec.shar" +
- "ed.DrillPBError\022\031\n\021minor_fragment_id\030\003 \001" +
- "(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.shar" +
- "ed.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022\020" +
- "\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n" +
- "\017max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\013" +
- "2\026.exec.DrillbitEndpoint\022\023\n\013last_update\030" +
- "\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\237\002\n\017Operato" +
- "rProfile\0221\n\rinput_profile\030\001 \003(\0132\032.exec.s" +
- "hared.StreamProfile\022\023\n\013operator_id\030\003 \001(\005" +
- "\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022\023\n\013setup_nan" +
- "os\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_" +
- "local_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010" +
- " \003(\0132\030.exec.shared.MetricValue\022\022\n\nwait_n" +
- "anos\030\t \001(\003\022\032\n\022operator_type_name\030\n \001(\t\"B" +
- "\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batc" +
- "hes\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValu" +
- "e\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003" +
- "\022\024\n\014double_value\030\003 \001(\001\")\n\010Registry\022\035\n\003ja" +
- "r\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name" +
- "\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(\t\"W\n\013Sa" +
- "slMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001" +
- "(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared.SaslSta" +
- "tus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BI" +
- "T_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001" +
- "\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION" +
- "\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFragmentS" +
- "tate\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION" +
- "\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELL" +
- "ED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQUEST" +
- "ED\020\006*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\n" +
- "SASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SA" +
- "SL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apa" +
- "che.drill.exec.protoB\rUserBitSharedH\001"
+ "UNSPECIFIED_ERROR\020\r\022\n\n\006PLUGIN\020\016\"\246\001\n\020Exce" +
+ "ptionWrapper\022\027\n\017exception_class\030\001 \001(\t\022\017\n" +
+ "\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132%.ex" +
+ "ec.shared.StackTraceElementWrapper\022,\n\005ca" +
+ "use\030\004 \001(\0132\035.exec.shared.ExceptionWrapper" +
+ "\"\205\001\n\030StackTraceElementWrapper\022\022\n\nclass_n" +
+ "ame\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line_num" +
+ "ber\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is_nat" +
+ "ive_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014star" +
+ "t_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend" +
+ "_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"\233\001\n\016Recor" +
+ "dBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005field" +
+ "\030\002 \003(\0132\034.exec.shared.SerializedField\022)\n!" +
+ "carries_two_byte_selection_vector\030\003 \001(\010\022" +
+ "\033\n\023affected_rows_count\030\004 \001(\005\"\205\001\n\010NamePar" +
+ "t\022(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.T" +
+ "ype\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec." +
+ "shared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARR" +
+ "AY\020\001\"\324\001\n\017SerializedField\022%\n\nmajor_type\030\001" +
+ " \001(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001" +
+ "(\0132\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\013" +
+ "2\034.exec.shared.SerializedField\022\023\n\013value_" +
+ "count\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\025\n\r" +
+ "buffer_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007nod" +
+ "e_id\030\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\263\002\n" +
+ "\013QueryResult\0228\n\013query_state\030\001 \001(\0162#.exec" +
+ ".shared.QueryResult.QueryState\022&\n\010query_" +
+ "id\030\002 \001(\0132\024.exec.shared.QueryId\022(\n\005error\030" +
+ "\003 \003(\0132\031.exec.shared.DrillPBError\"\227\001\n\nQue" +
+ "ryState\022\014\n\010STARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCO" +
+ "MPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026C" +
+ "ANCELLATION_REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n" +
+ "\tPREPARING\020\007\022\014\n\010PLANNING\020\010\"\215\001\n\tQueryData" +
+ "\022&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId" +
+ "\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec." +
+ "shared.RecordBatchDef\022\033\n\023affected_rows_c" +
+ "ount\030\004 \001(\005\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022" +
+ "\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.sha" +
+ "red.QueryResult.QueryState\022\017\n\004user\030\004 \001(\t" +
+ ":\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.DrillbitEndp" +
+ "oint\022\024\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost" +
+ "\030\007 \001(\001\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\337\004\n\014QueryP" +
+ "rofile\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId" +
+ "\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryType\022\r" +
+ "\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(" +
+ "\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec." +
+ "DrillbitEndpoint\0222\n\005state\030\010 \001(\0162#.exec.s" +
+ "hared.QueryResult.QueryState\022\027\n\017total_fr" +
+ "agments\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(" +
+ "\005\022;\n\020fragment_profile\030\013 \003(\0132!.exec.share" +
+ "d.MajorFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022" +
+ "\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010" +
+ "error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014op" +
+ "tions_json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014que" +
+ "ueWaitEnd\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nq" +
+ "ueue_name\030\025 \001(\t:\001-\022\017\n\007queryId\030\026 \001(\t\022\021\n\ta" +
+ "utoLimit\030\027 \001(\005\022\027\n\017scanned_plugins\030\030 \003(\t\"" +
+ "t\n\024MajorFragmentProfile\022\031\n\021major_fragmen" +
+ "t_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030\002 \003" +
+ "(\0132!.exec.shared.MinorFragmentProfile\"\350\002" +
+ "\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\0162\032." +
+ "exec.shared.FragmentState\022(\n\005error\030\002 \001(\013" +
+ "2\031.exec.shared.DrillPBError\022\031\n\021minor_fra" +
+ "gment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(\013" +
+ "2\034.exec.shared.OperatorProfile\022\022\n\nstart_" +
+ "time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_u" +
+ "sed\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010en" +
+ "dpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n\013" +
+ "last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003" +
+ "\"\237\002\n\017OperatorProfile\0221\n\rinput_profile\030\001 " +
+ "\003(\0132\032.exec.shared.StreamProfile\022\023\n\013opera" +
+ "tor_id\030\003 \001(\005\022\031\n\roperator_type\030\004 \001(\005B\002\030\001\022" +
+ "\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 " +
+ "\001(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003" +
+ "\022(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricVal" +
+ "ue\022\022\n\nwait_nanos\030\t \001(\003\022\032\n\022operator_type_" +
+ "name\030\n \001(\t\"B\n\rStreamProfile\022\017\n\007records\030\001" +
+ " \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J" +
+ "\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong" +
+ "_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Re" +
+ "gistry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n" +
+ "\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signature" +
+ "\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t" +
+ "\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.sh" +
+ "ared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CON" +
+ "TROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryT" +
+ "ype\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022" +
+ "\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207" +
+ "\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITIN" +
+ "G_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020" +
+ "\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLA" +
+ "TION_REQUESTED\020\006*g\n\nSaslStatus\022\020\n\014SASL_U" +
+ "NKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROG" +
+ "RESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020" +
+ "\004B.\n\033org.apache.drill.exec.protoB\rUserBi" +
+ "tSharedH\001"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 2ed8826db6..dafd533cdd 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -123,6 +123,11 @@ message DrillPBError{
* closer to the cause.
*/
UNSPECIFIED_ERROR = 13;
+ /* Plugin exception
+ * - A failure has occurred within a plugin.
+ * Indicates that a plugin is misconfigured or contains a bug.
+ */
+ PLUGIN = 14;
}
optional string error_id = 1; // for debug tracing purposes
optional DrillbitEndpoint endpoint = 2;