You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/12 10:32:44 UTC
[1/4] drill git commit: DRILL-3010: Convert bad command error
messages into UserExceptions in SqlHandlers
Repository: drill
Updated Branches:
refs/heads/master 7c5a1f57c -> 8d5778362
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index ec92392..92afa4f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2056,6 +2056,16 @@ public final class UserBitShared {
* </pre>
*/
UNSUPPORTED_OPERATION(9, 9),
+ /**
+ * <code>VALIDATION = 10;</code>
+ *
+ * <pre>
+ * SQL validation exception
+ * - invalid schema path
+ * - invalid entries in SQL tree
+ * </pre>
+ */
+ VALIDATION(10, 10),
;
/**
@@ -2154,6 +2164,16 @@ public final class UserBitShared {
* </pre>
*/
public static final int UNSUPPORTED_OPERATION_VALUE = 9;
+ /**
+ * <code>VALIDATION = 10;</code>
+ *
+ * <pre>
+ * SQL validation exception
+ * - invalid schema path
+ * - invalid entries in SQL tree
+ * </pre>
+ */
+ public static final int VALIDATION_VALUE = 10;
public final int getNumber() { return value; }
@@ -2170,6 +2190,7 @@ public final class UserBitShared {
case 7: return RESOURCE;
case 8: return SYSTEM;
case 9: return UNSUPPORTED_OPERATION;
+ case 10: return VALIDATION;
default: return null;
}
}
@@ -20846,113 +20867,113 @@ public final class UserBitShared {
"s.proto\032\022Coordination.proto\032\017SchemaDef.p" +
"roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001" +
"(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 " +
- "\001(\020\"\235\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
+ "\001(\020\"\255\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
"\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint" +
"\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill" +
"PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex" +
"ception\030\005 \001(\0132\035.exec.shared.ExceptionWra" +
"pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share",
- "d.ParsingError\"\242\001\n\tErrorType\022\016\n\nCONNECTI" +
+ "d.ParsingError\"\262\001\n\tErrorType\022\016\n\nCONNECTI" +
"ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F" +
"UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004" +
"PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU" +
- "PPORTED_OPERATION\020\t\"\246\001\n\020ExceptionWrapper" +
- "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001" +
- "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St" +
- "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035." +
- "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr" +
- "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n",
- "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n" +
- "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005" +
- " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001" +
- "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(" +
- "\005\022\017\n\007end_row\030\005 \001(\005\"~\n\016RecordBatchDef\022\024\n\014" +
- "record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exec" +
- ".shared.SerializedField\022)\n!carries_two_b" +
- "yte_selection_vector\030\003 \001(\010\"\205\001\n\010NamePart\022" +
- "(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.Typ" +
- "e\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.sh",
- "ared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY" +
- "\020\001\"\351\001\n\017SerializedField\022%\n\nmajor_type\030\001 \001" +
- "(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001(\013" +
- "2\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\0132\034" +
- ".exec.shared.SerializedField\022\023\n\013value_co" +
- "unt\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\023\n\013gr" +
- "oup_count\030\006 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7" +
- "\n\nNodeStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_" +
- "footprint\030\002 \001(\003\"\206\002\n\013QueryResult\0228\n\013query" +
- "_state\030\001 \001(\0162#.exec.shared.QueryResult.Q",
- "ueryState\022&\n\010query_id\030\002 \001(\0132\024.exec.share" +
- "d.QueryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.D" +
- "rillPBError\"k\n\nQueryState\022\013\n\007PENDING\020\000\022\013" +
- "\n\007RUNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003" +
- "\022\n\n\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005" +
- "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" +
- "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" +
- "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\227\001\n\tQ" +
- "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222" +
- "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult.",
- "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" +
- " \001(\0132\026.exec.DrillbitEndpoint\"\272\003\n\014QueryPr" +
- "ofile\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
- "$\n\004type\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n" +
- "\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t" +
- "\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.D" +
- "rillbitEndpoint\0222\n\005state\030\010 \001(\0162#.exec.sh" +
- "ared.QueryResult.QueryState\022\027\n\017total_fra" +
- "gments\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005" +
- "\022;\n\020fragment_profile\030\013 \003(\0132!.exec.shared",
- ".MajorFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r" +
- "\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010e" +
- "rror_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\"t\n\024Maj" +
- "orFragmentProfile\022\031\n\021major_fragment_id\030\001" +
- " \001(\005\022A\n\026minor_fragment_profile\030\002 \003(\0132!.e" +
- "xec.shared.MinorFragmentProfile\"\350\002\n\024Mino" +
- "rFragmentProfile\022)\n\005state\030\001 \001(\0162\032.exec.s" +
- "hared.FragmentState\022(\n\005error\030\002 \001(\0132\031.exe" +
- "c.shared.DrillPBError\022\031\n\021minor_fragment_" +
- "id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exe",
- "c.shared.OperatorProfile\022\022\n\nstart_time\030\005" +
- " \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 " +
- "\001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010endpoint" +
- "\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n\013last_u" +
- "pdate\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\377\001\n\017O" +
- "peratorProfile\0221\n\rinput_profile\030\001 \003(\0132\032." +
- "exec.shared.StreamProfile\022\023\n\013operator_id" +
- "\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023\n\013setup_n" +
- "anos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033pea" +
- "k_local_memory_allocated\030\007 \001(\003\022(\n\006metric",
- "\030\010 \003(\0132\030.exec.shared.MetricValue\022\022\n\nwait" +
- "_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017\n\007records" +
- "\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003" +
- "\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlo" +
- "ng_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001*5\n\n" +
- "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" +
- "\001\022\010\n\004USER\020\002*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" +
- "ICAL\020\002\022\014\n\010PHYSICAL\020\003*\207\001\n\rFragmentState\022\013" +
- "\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007" +
- "RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n",
- "\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\335" +
- "\005\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024" +
- "\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH" +
- "_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOI" +
- "N\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020" +
- "\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTI" +
- "TION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_" +
- "RECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r" +
- "\022\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAM" +
- "ING_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTER",
- "NAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_" +
- "SORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHI" +
- "VE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\r" +
- "MOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017D" +
- "IRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEX" +
- "T_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_" +
- "SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025" +
- "\n\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020" +
- "!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rA" +
- "VRO_SUB_SCAN\020$B.\n\033org.apache.drill.exec.",
- "protoB\rUserBitSharedH\001"
+ "PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\"\246\001\n\020" +
+ "ExceptionWrapper\022\027\n\017exception_class\030\001 \001(" +
+ "\t\022\017\n\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132" +
+ "%.exec.shared.StackTraceElementWrapper\022," +
+ "\n\005cause\030\004 \001(\0132\035.exec.shared.ExceptionWra" +
+ "pper\"\205\001\n\030StackTraceElementWrapper\022\022\n\ncla",
+ "ss_name\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line" +
+ "_number\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is" +
+ "_native_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014" +
+ "start_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n" +
+ "\nend_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"~\n\016Re" +
+ "cordBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005fi" +
+ "eld\030\002 \003(\0132\034.exec.shared.SerializedField\022" +
+ ")\n!carries_two_byte_selection_vector\030\003 \001" +
+ "(\010\"\205\001\n\010NamePart\022(\n\004type\030\001 \001(\0162\032.exec.sha" +
+ "red.NamePart.Type\022\014\n\004name\030\002 \001(\t\022$\n\005child",
+ "\030\003 \001(\0132\025.exec.shared.NamePart\"\033\n\004Type\022\010\n" +
+ "\004NAME\020\000\022\t\n\005ARRAY\020\001\"\351\001\n\017SerializedField\022%" +
+ "\n\nmajor_type\030\001 \001(\0132\021.common.MajorType\022(\n" +
+ "\tname_part\030\002 \001(\0132\025.exec.shared.NamePart\022" +
+ "+\n\005child\030\003 \003(\0132\034.exec.shared.SerializedF" +
+ "ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" +
+ "ngth\030\005 \001(\005\022\023\n\013group_count\030\006 \001(\005\022\025\n\rbuffe" +
+ "r_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_id\030" +
+ "\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\206\002\n\013Quer" +
+ "yResult\0228\n\013query_state\030\001 \001(\0162#.exec.shar",
+ "ed.QueryResult.QueryState\022&\n\010query_id\030\002 " +
+ "\001(\0132\024.exec.shared.QueryId\022(\n\005error\030\003 \003(\013" +
+ "2\031.exec.shared.DrillPBError\"k\n\nQueryStat" +
+ "e\022\013\n\007PENDING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMPLETED" +
+ "\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CANCELLA" +
+ "TION_REQUESTED\020\005\"p\n\tQueryData\022&\n\010query_i" +
+ "d\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n\trow_cou" +
+ "nt\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.Reco" +
+ "rdBatchDef\"\227\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022" +
+ "\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.sha",
+ "red.QueryResult.QueryState\022\017\n\004user\030\004 \001(\t" +
+ ":\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.DrillbitEndp" +
+ "oint\"\272\003\n\014QueryProfile\022 \n\002id\030\001 \001(\0132\024.exec" +
+ ".shared.QueryId\022$\n\004type\030\002 \001(\0162\026.exec.sha" +
+ "red.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(" +
+ "\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007forema" +
+ "n\030\007 \001(\0132\026.exec.DrillbitEndpoint\0222\n\005state" +
+ "\030\010 \001(\0162#.exec.shared.QueryResult.QuerySt" +
+ "ate\022\027\n\017total_fragments\030\t \001(\005\022\032\n\022finished" +
+ "_fragments\030\n \001(\005\022;\n\020fragment_profile\030\013 \003",
+ "(\0132!.exec.shared.MajorFragmentProfile\022\017\n" +
+ "\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024\n\014verbose" +
+ "Error\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022\n\nerror_n" +
+ "ode\030\020 \001(\t\"t\n\024MajorFragmentProfile\022\031\n\021maj" +
+ "or_fragment_id\030\001 \001(\005\022A\n\026minor_fragment_p" +
+ "rofile\030\002 \003(\0132!.exec.shared.MinorFragment" +
+ "Profile\"\350\002\n\024MinorFragmentProfile\022)\n\005stat" +
+ "e\030\001 \001(\0162\032.exec.shared.FragmentState\022(\n\005e" +
+ "rror\030\002 \001(\0132\031.exec.shared.DrillPBError\022\031\n" +
+ "\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_pro",
+ "file\030\004 \003(\0132\034.exec.shared.OperatorProfile" +
+ "\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023" +
+ "\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010" +
+ " \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.DrillbitEn" +
+ "dpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_prog" +
+ "ress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_" +
+ "profile\030\001 \003(\0132\032.exec.shared.StreamProfil" +
+ "e\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_type\030" +
+ "\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_na" +
+ "nos\030\006 \001(\003\022#\n\033peak_local_memory_allocated",
+ "\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared.Met" +
+ "ricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamPr" +
+ "ofile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022" +
+ "\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetri" +
+ "c_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double" +
+ "_value\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_CONTRO" +
+ "L\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQueryType" +
+ "\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*\207\001\n" +
+ "\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_" +
+ "ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022",
+ "\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATI" +
+ "ON_REQUESTED\020\006*\335\005\n\020CoreOperatorType\022\021\n\rS" +
+ "INGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006" +
+ "FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOI" +
+ "N\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SE" +
+ "NDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVER\020\010\022" +
+ "\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PROJECT" +
+ "\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014RANGE_SEND" +
+ "ER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_REM" +
+ "OVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_N",
+ "_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n" +
+ "\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_GR" +
+ "OUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM_" +
+ "TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQU" +
+ "ET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEXT" +
+ "_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SUB" +
+ "_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017COM" +
+ "PLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n" +
+ "\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_" +
+ "LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$B.\n\033org.ap",
+ "ache.drill.exec.protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillPBError.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillPBError.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillPBError.java
index 873ffa4..ee237d9 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillPBError.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/DrillPBError.java
@@ -46,7 +46,8 @@ public final class DrillPBError implements Externalizable, Message<DrillPBError>
PLAN(6),
RESOURCE(7),
SYSTEM(8),
- UNSUPPORTED_OPERATION(9);
+ UNSUPPORTED_OPERATION(9),
+ VALIDATION(10);
public final int number;
@@ -74,6 +75,7 @@ public final class DrillPBError implements Externalizable, Message<DrillPBError>
case 7: return RESOURCE;
case 8: return SYSTEM;
case 9: return UNSUPPORTED_OPERATION;
+ case 10: return VALIDATION;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index e3b6168..68c8612 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -77,6 +77,11 @@ message DrillPBError{
* - schema change
*/
UNSUPPORTED_OPERATION = 9;
+ /* SQL validation exception
+ * - invalid schema path
+ * - invalid entries in SQL tree
+ */
+ VALIDATION = 10;
}
optional string error_id = 1; // for debug tracing purposes
optional DrillbitEndpoint endpoint = 2;
[3/4] drill git commit: DRILL-2476: Added BatchState.STOP in
buildSchema() so AbstractRecordBatch returns IterOutcome.STOP
Posted by ve...@apache.org.
DRILL-2476: Added BatchState.STOP in buildSchema() so AbstractRecordBatch returns IterOutcome.STOP
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/eb79e805
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/eb79e805
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/eb79e805
Branch: refs/heads/master
Commit: eb79e805fcc01280a77e3525cdd2638c9f22b5d1
Parents: 6cc89e9
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Wed May 6 14:24:26 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/physical/impl/TopN/TopNBatch.java | 1 +
.../drill/exec/physical/impl/WriterRecordBatch.java | 4 ----
.../drill/exec/physical/impl/join/HashJoinBatch.java | 5 +++++
.../drill/exec/physical/impl/join/MergeJoinBatch.java | 12 ++++++++++--
.../exec/physical/impl/join/NestedLoopJoinBatch.java | 5 +++++
.../physical/impl/mergereceiver/MergingRecordBatch.java | 3 ++-
.../exec/physical/impl/xsort/ExternalSortBatch.java | 1 +
7 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c3e70f5..1cf6213 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -119,6 +119,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
super.close();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
VectorContainer c = new VectorContainer(oContext);
IterOutcome outcome = next(incoming);
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 28a99d9..d5d64a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -77,10 +77,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
}
@Override
- public void buildSchema() throws SchemaChangeException {
- }
-
- @Override
public IterOutcome innerNext() {
if(processed) {
// cleanup();
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 56ce0ee..6490251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -172,6 +172,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
leftUpstream = next(left);
rightUpstream = next(right);
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0430f1b..026d79e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -134,7 +134,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
comparator = JoinUtils.checkAndSetComparison(condition, comparator);
}
assert comparator != JoinComparator.NONE;
- areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
+ areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM);
}
public JoinRelType getJoinType() {
@@ -146,10 +146,18 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
return status.getOutPosition();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
status.ensureInitial();
- if (status.getLastLeft() == IterOutcome.OUT_OF_MEMORY || status.getLastRight() == IterOutcome.OUT_OF_MEMORY) {
+ final IterOutcome leftOutcome = status.getLastLeft();
+ final IterOutcome rightOutcome = status.getLastRight();
+ if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
+ if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4c86f5c..de0d8e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -277,6 +277,11 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
leftUpstream = next(LEFT_INPUT, left);
rightUpstream = next(RIGHT_INPUT, right);
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return;
+ }
+
if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
state = BatchState.OUT_OF_MEMORY;
return;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 5d990f0..b28b7b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -460,7 +460,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return outgoingContainer.getSchema();
}
- public void buildSchema() {
+ @Override
+ public void buildSchema() throws SchemaChangeException {
// find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
tempBatchHolder = new RawFragmentBatch[fragProviders.length];
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/eb79e805/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 3159811..5cdd2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -171,6 +171,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
super.close();
}
+ @Override
public void buildSchema() throws SchemaChangeException {
IterOutcome outcome = next(incoming);
switch (outcome) {
[2/4] drill git commit: DRILL-3010: Convert bad command error
messages into UserExceptions in SqlHandlers
Posted by ve...@apache.org.
DRILL-3010: Convert bad command error messages into UserExceptions in SqlHandlers
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6cc89e99
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6cc89e99
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6cc89e99
Branch: refs/heads/master
Commit: 6cc89e99c7f557ffb04f16c0857d3b03a744fad4
Parents: 7c5a1f5
Author: vkorukanti <ve...@gmail.com>
Authored: Fri Apr 17 17:20:10 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:05 2015 -0700
----------------------------------------------------------------------
.../drill/common/exceptions/UserException.java | 26 +++
.../org/apache/drill/exec/ops/QueryContext.java | 1 -
.../drill/exec/planner/sql/SchemaUtilites.java | 179 +++++++++++++++
.../sql/handlers/AbstractSqlHandler.java | 55 -----
.../sql/handlers/CreateTableHandler.java | 66 +++---
.../sql/handlers/DescribeTableHandler.java | 30 ++-
.../planner/sql/handlers/ShowFileHandler.java | 64 +++---
.../planner/sql/handlers/ShowTablesHandler.java | 17 +-
.../planner/sql/handlers/SqlHandlerUtil.java | 18 +-
.../planner/sql/handlers/UseSchemaHandler.java | 19 +-
.../exec/planner/sql/handlers/ViewHandler.java | 134 +++++-------
.../apache/drill/exec/rpc/user/UserSession.java | 72 ++++--
.../apache/drill/exec/store/AbstractSchema.java | 31 ++-
.../exec/store/dfs/WorkspaceSchemaFactory.java | 5 +-
.../java/org/apache/drill/BaseTestQuery.java | 2 +
.../TestImpersonationMetadata.java | 8 +-
.../physical/impl/writer/TestParquetWriter.java | 46 ----
.../org/apache/drill/exec/sql/TestCTAS.java | 64 ++++--
.../apache/drill/exec/sql/TestInfoSchema.java | 28 ++-
.../apache/drill/exec/sql/TestViewSupport.java | 86 ++------
.../apache/drill/exec/proto/UserBitShared.java | 219 ++++++++++---------
.../drill/exec/proto/beans/DrillPBError.java | 4 +-
protocol/src/main/protobuf/UserBitShared.proto | 5 +
23 files changed, 669 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 21859ed..a67cb3f 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -213,6 +213,32 @@ public class UserException extends DrillRuntimeException {
/**
* Creates a new user exception builder .
*
+ * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#VALIDATION
+ * @return user exception builder
+ */
+ public static Builder validationError() {
+ return validationError(null);
+ }
+
+ /**
+ * wraps the passed exception inside a system error.
+ * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
+ * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+ * of creating a new exception. Any added context will be added to the user exception as well.
+ *
+ * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#VALIDATION
+ *
+ * @param cause exception we want the user exception to wrap. If cause is, or wrap, a user exception it will be
+ * returned by the builder instead of creating a new user exception
+ * @return user exception builder
+ */
+ public static Builder validationError(Throwable cause) {
+ return new Builder(DrillPBError.ErrorType.VALIDATION, cause);
+ }
+
+ /**
+ * creates a new user exception builder .
+ *
* @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PERMISSION
* @return user exception builder
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 9e2f210..8917a24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -27,7 +27,6 @@ import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
new file mode 100644
index 0000000..655e135
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.AbstractSchema;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SchemaUtilites {
+ public static final Joiner SCHEMA_PATH_JOINER = Joiner.on(".").skipNulls();
+
+ /**
+ * Search and return schema with given schemaPath. First search in schema tree starting from defaultSchema,
+ * if not found search starting from rootSchema. Root schema tree is derived from the defaultSchema reference.
+ *
+ * @param defaultSchema Reference to the default schema in complete schema tree.
+ * @param schemaPath Schema path to search.
+ * @return SchemaPlus object.
+ */
+ public static SchemaPlus findSchema(final SchemaPlus defaultSchema, final List<String> schemaPath) {
+ if (schemaPath.size() == 0) {
+ return defaultSchema;
+ }
+
+ SchemaPlus schema;
+ if ((schema = searchSchemaTree(defaultSchema, schemaPath)) != null) {
+ return schema;
+ }
+
+ SchemaPlus rootSchema = defaultSchema;
+ while(rootSchema.getParentSchema() != null) {
+ rootSchema = rootSchema.getParentSchema();
+ }
+
+ if (rootSchema != defaultSchema &&
+ (schema = searchSchemaTree(rootSchema, schemaPath)) != null) {
+ return schema;
+ }
+
+ return null;
+ }
+
+ /**
+ * Same utility as {@link #findSchema(SchemaPlus, List)} except the search schema path given here is complete path
+ * instead of list. Use "." separator to divided the schema into nested schema names.
+ * @param defaultSchema
+ * @param schemaPath
+ * @return
+ * @throws ValidationException
+ */
+ public static SchemaPlus findSchema(final SchemaPlus defaultSchema, final String schemaPath) {
+ final List<String> schemaPathAsList = Lists.newArrayList(schemaPath.split("\\."));
+ return findSchema(defaultSchema, schemaPathAsList);
+ }
+
+ /** Utility method to search for schema path starting from the given <i>schema</i> reference */
+ private static SchemaPlus searchSchemaTree(SchemaPlus schema, final List<String> schemaPath) {
+ for (String schemaName : schemaPath) {
+ schema = schema.getSubSchema(schemaName);
+ if (schema == null) {
+ return null;
+ }
+ }
+ return schema;
+ }
+
+ /**
+ * Returns true if the given <i>schema</i> is root schema. False otherwise.
+ * @param schema
+ * @return
+ */
+ public static boolean isRootSchema(SchemaPlus schema) {
+ return schema.getParentSchema() == null;
+ }
+
+ /**
+ * Unwrap given <i>SchemaPlus</i> instance as Drill schema instance (<i>AbstractSchema</i>). Once unwrapped, return
+ * default schema from <i>AbstractSchema</i>. If the given schema is not an instance of <i>AbstractSchema</i> a
+ * {@link UserException} is thrown.
+ */
+ public static AbstractSchema unwrapAsDrillSchemaInstance(SchemaPlus schemaPlus) {
+ try {
+ return schemaPlus.unwrap(AbstractSchema.class).getDefaultSchema();
+ } catch (ClassCastException e) {
+ throw UserException.validationError(e)
+ .message("Schema [%s] is not a Drill schema.", getSchemaPath(schemaPlus))
+ .build();
+ }
+ }
+
+ /** Utility method to get the schema path for given schema instance. */
+ public static String getSchemaPath(SchemaPlus schema) {
+ return SCHEMA_PATH_JOINER.join(getSchemaPathAsList(schema));
+ }
+
+ /** Utility method to get the schema path as list for given schema instance. */
+ public static List<String> getSchemaPathAsList(SchemaPlus schema) {
+ if (isRootSchema(schema)) {
+ return Collections.EMPTY_LIST;
+ }
+
+ List<String> path = Lists.newArrayListWithCapacity(5);
+ while(schema != null) {
+ final String name = schema.getName();
+ if (!Strings.isNullOrEmpty(name)) {
+ path.add(schema.getName());
+ }
+ schema = schema.getParentSchema();
+ }
+
+ return Lists.reverse(path);
+ }
+
+ /** Utility method to throw {@link UserException} with context information */
+ public static void throwSchemaNotFoundException(final SchemaPlus defaultSchema, final String givenSchemaPath) {
+ throw UserException.validationError()
+ .message("Schema [%s] is not valid with respect to either root schema or current default schema.",
+ givenSchemaPath)
+ .addContext("Current default schema: ",
+ isRootSchema(defaultSchema) ? "No default schema selected" : getSchemaPath(defaultSchema))
+ .build();
+ }
+
+ /**
+ * Given reference to default schema in schema tree, search for schema with given <i>schemaPath</i>. Once a schema is
+ * found resolve it into a mutable <i>AbstractDrillSchema</i> instance. A {@link UserException} is throws when:
+ * 1. No schema for given <i>schemaPath</i> is found,
+ * 2. Schema found for given <i>schemaPath</i> is a root schema
+ * 3. Resolved schema is not a mutable schema.
+ * @param defaultSchema
+ * @param schemaPath
+ * @return
+ */
+ public static AbstractSchema resolveToMutableDrillSchema(final SchemaPlus defaultSchema, List<String> schemaPath) {
+ final SchemaPlus schema = findSchema(defaultSchema, schemaPath);
+
+ if (schema == null) {
+ throwSchemaNotFoundException(defaultSchema, SCHEMA_PATH_JOINER.join(schemaPath));
+ }
+
+ if (isRootSchema(schema)) {
+ throw UserException.parseError()
+ .message("Root schema is immutable. Creating or dropping tables/views is not allowed in root schema." +
+ "Select a schema using 'USE schema' command.")
+ .build();
+ }
+
+ final AbstractSchema drillSchema = unwrapAsDrillSchemaInstance(schema);
+ if (!drillSchema.isMutable()) {
+ throw UserException.parseError()
+ .message("Unable to create or drop tables/views. Schema [%s] is immutable.", getSchemaPath(schema))
+ .build();
+ }
+
+ return drillSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
index 96fd877..6ce25a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlHandler.java
@@ -43,59 +43,4 @@ public abstract class AbstractSqlHandler {
throw new ForemanSetupException(String.format("Failure trying to treat %s as type %s.", o.getClass().getSimpleName(), clazz.getSimpleName()));
}
}
-
- /**
- * From a given SchemaPlus return a Drill schema object of type AbstractSchema if exists.
- * Otherwise throw errors.
- */
- public static AbstractSchema getDrillSchema(SchemaPlus schemaPlus) throws Exception {
- AbstractSchema drillSchema;
- try {
- drillSchema = schemaPlus.unwrap(AbstractSchema.class);
- drillSchema = drillSchema.getDefaultSchema();
- } catch (ClassCastException e) {
- throw new Exception("Current schema is not a Drill schema. " +
- "Can't create new relations (tables or views) in non-Drill schemas.", e);
- }
-
- return drillSchema;
- }
-
- /**
- * Search for a schema with given schemaPath. First search in schema tree rooted at defaultSchema,
- * if not found search in rootSchema. If no schema found throw errors.
- */
- public static SchemaPlus findSchema(SchemaPlus rootSchema, SchemaPlus defaultSchema, List<String> schemaPath)
- throws Exception {
- if (schemaPath.size() == 0) {
- return defaultSchema;
- }
-
- SchemaPlus schema;
-
- if ((schema = searchSchemaTree(defaultSchema, schemaPath)) != null) {
- return schema;
- }
-
- if ((schema = searchSchemaTree(rootSchema, schemaPath)) != null) {
- return schema;
- }
-
- throw new Exception(String.format("Invalid schema path '%s'.", Joiner.on(".").join(schemaPath)));
- }
-
- public static boolean isRootSchema(SchemaPlus schema) {
- return schema.getParentSchema() == null;
- }
-
- private static SchemaPlus searchSchemaTree(SchemaPlus schema, List<String> schemaPath) {
- for (String schemaName : schemaPath) {
- schema = schema.getSubSchema(schemaName);
- if (schema == null) {
- return null;
- }
- }
- return schema;
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index e9ac1e1..2866b8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.planner.sql.handlers;
import java.io.IOException;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.TypedSqlNode;
-import org.apache.calcite.tools.Planner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.logical.DrillRel;
@@ -32,17 +32,12 @@ import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.DrillStoreRel;
import org.apache.drill.exec.planner.logical.DrillWriterRel;
import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.hep.HepPlanner;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlNode;
public class CreateTableHandler extends DefaultSqlHandler {
public CreateTableHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
@@ -53,43 +48,32 @@ public class CreateTableHandler extends DefaultSqlHandler {
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class);
- try {
- final RelNode newTblRelNode =
- SqlHandlerUtil.resolveNewTableRel(false, planner, sqlCreateTable.getFieldNames(), sqlCreateTable.getQuery());
+ final String newTblName = sqlCreateTable.getName();
+ final RelNode newTblRelNode =
+ SqlHandlerUtil.resolveNewTableRel(false, planner, sqlCreateTable.getFieldNames(), sqlCreateTable.getQuery());
- SchemaPlus schema = findSchema(context.getRootSchema(), context.getNewDefaultSchema(),
- sqlCreateTable.getSchemaPath());
+ final AbstractSchema drillSchema =
+ SchemaUtilites.resolveToMutableDrillSchema(context.getNewDefaultSchema(), sqlCreateTable.getSchemaPath());
+ final String schemaPath = drillSchema.getFullSchemaName();
- AbstractSchema drillSchema = getDrillSchema(schema);
-
- if (!drillSchema.isMutable()) {
- return DirectPlan.createDirectPlan(context, false, String.format("Unable to create table. " +
- "Schema [%s] is immutable. ", drillSchema.getFullSchemaName()));
- }
-
- final String newTblName = sqlCreateTable.getName();
- if (SqlHandlerUtil.getTableFromSchema(drillSchema, newTblName) != null) {
- throw new ValidationException(
- String.format("A table or view with given name [%s] already exists in schema [%s]",
- newTblName, drillSchema.getFullSchemaName()));
- }
+ if (SqlHandlerUtil.getTableFromSchema(drillSchema, newTblName) != null) {
+ throw UserException.validationError()
+ .message("A table or view with given name [%s] already exists in schema [%s]", newTblName, schemaPath)
+ .build();
+ }
- log("Optiq Logical", newTblRelNode);
+ log("Optiq Logical", newTblRelNode);
- // Convert the query to Drill Logical plan and insert a writer operator on top.
- DrillRel drel = convertToDrel(newTblRelNode, drillSchema, newTblName);
- log("Drill Logical", drel);
- Prel prel = convertToPrel(drel);
- log("Drill Physical", prel);
- PhysicalOperator pop = convertToPop(prel);
- PhysicalPlan plan = convertToPlan(pop);
- log("Drill Plan", plan);
+ // Convert the query to Drill Logical plan and insert a writer operator on top.
+ DrillRel drel = convertToDrel(newTblRelNode, drillSchema, newTblName);
+ log("Drill Logical", drel);
+ Prel prel = convertToPrel(drel);
+ log("Drill Physical", prel);
+ PhysicalOperator pop = convertToPop(prel);
+ PhysicalPlan plan = convertToPlan(pop);
+ log("Drill Plan", plan);
- return plan;
- } catch(Exception e) {
- logger.error("Failed to create table '{}'", sqlCreateTable.getName(), e);
- return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage()));
- }
+ return plan;
}
private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName) throws RelConversionException {
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index c76914b..81defa3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -18,18 +18,17 @@
package org.apache.drill.exec.planner.sql.handlers;
-import static org.apache.drill.exec.planner.sql.parser.DrillParserUtil.CHARSET;
-
import java.util.List;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.*;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable;
-import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
@@ -42,6 +41,8 @@ import org.apache.calcite.util.Util;
import com.google.common.collect.ImmutableList;
+import static org.apache.drill.exec.planner.sql.parser.DrillParserUtil.CHARSET;
+
public class DescribeTableHandler extends DefaultSqlHandler {
public DescribeTableHandler(SqlHandlerConfig config) { super(config); }
@@ -61,22 +62,25 @@ public class DescribeTableHandler extends DefaultSqlHandler {
ImmutableList.of(IS_SCHEMA_NAME, TAB_COLUMNS), null, SqlParserPos.ZERO, null);
final SqlIdentifier table = node.getTable();
- final SchemaPlus schema = findSchema(context.getRootSchema(), context.getNewDefaultSchema(),
- Util.skipLast(table.names));
+ final SchemaPlus schema = SchemaUtilites.findSchema(context.getNewDefaultSchema(), Util.skipLast(table.names));
+
final String tableName = Util.last(table.names);
+ // find resolved schema path
+ final String schemaPath = SchemaUtilites.getSchemaPath(schema);
+
if (schema.getTable(tableName) == null) {
- throw new RelConversionException(String.format("Table %s is not valid", Util.sepList(table.names, ".")));
+ throw UserException.validationError()
+ .message("Unknown table [%s] in schema [%s]", tableName, schemaPath)
+ .build();
}
SqlNode schemaCondition = null;
- if (!isRootSchema(schema)) {
- AbstractSchema drillSchema = getDrillSchema(schema);
-
+ if (!SchemaUtilites.isRootSchema(schema)) {
schemaCondition = DrillParserUtil.createCondition(
new SqlIdentifier(SHRD_COL_TABLE_SCHEMA, SqlParserPos.ZERO),
SqlStdOperatorTable.EQUALS,
- SqlLiteral.createCharString(drillSchema.getFullSchemaName(), CHARSET, SqlParserPos.ZERO)
+ SqlLiteral.createCharString(schemaPath, CHARSET, SqlParserPos.ZERO)
);
}
@@ -106,7 +110,9 @@ public class DescribeTableHandler extends DefaultSqlHandler {
return new SqlSelect(SqlParserPos.ZERO, null, new SqlNodeList(selectList, SqlParserPos.ZERO),
fromClause, where, null, null, null, null, null, null);
} catch (Exception ex) {
- throw new RelConversionException("Error while rewriting DESCRIBE query: " + ex.getMessage(), ex);
+ throw UserException.planError(ex)
+ .message("Error while rewriting DESCRIBE query: %d", ex.getMessage())
+ .build();
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index 7062375..c96dc73 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -25,8 +25,10 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
@@ -53,43 +55,43 @@ public class ShowFileHandler extends DefaultSqlHandler {
String defaultLocation = null;
String fromDir = "./";
- try {
- SchemaPlus defaultSchema = context.getNewDefaultSchema();
- SchemaPlus drillSchema = defaultSchema;
-
- // Show files can be used without from clause, in which case we display the files in the default schema
- if (from != null) {
- // We are not sure if the full from clause is just the schema or includes table name, first try to see if the full path specified is a schema
- try {
- drillSchema = findSchema(context.getRootSchema(), defaultSchema, from.names);
- } catch (Exception e) {
- // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
- drillSchema = findSchema(context.getRootSchema(), defaultSchema, from.names.subList(0, from.names.size() - 1));
- fromDir = fromDir + from.names.get((from.names.size() - 1));
- }
+ SchemaPlus defaultSchema = context.getNewDefaultSchema();
+ SchemaPlus drillSchema = defaultSchema;
+
+ // Show files can be used without from clause, in which case we display the files in the default schema
+ if (from != null) {
+ // We are not sure if the full from clause is just the schema or includes table name,
+ // first try to see if the full path specified is a schema
+ drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names);
+ if (drillSchema == null) {
+ // Entire from clause is not a schema, try to obtain the schema without the last part of the specified clause.
+ drillSchema = SchemaUtilites.findSchema(defaultSchema, from.names.subList(0, from.names.size() - 1));
+ fromDir = fromDir + from.names.get((from.names.size() - 1));
}
- AbstractSchema tempSchema = getDrillSchema(drillSchema);
- WorkspaceSchema schema = null;
- if (tempSchema instanceof WorkspaceSchema) {
- schema = ((WorkspaceSchema)tempSchema);
- } else {
- throw new ValidationException("Unsupported schema");
+ if (drillSchema == null) {
+ throw UserException.validationError()
+ .message("Invalid FROM/IN clause [%s]", from.toString())
+ .build();
}
+ }
- // Get the file system object
- fs = schema.getFS();
-
- // Get the default path
- defaultLocation = schema.getDefaultLocation();
- } catch (Exception e) {
- if (from == null) {
- return DirectPlan.createDirectPlan(context, false, "Show files without FROM / IN clause can be used only after specifying a default file system schema");
- }
- return DirectPlan.createDirectPlan(context, false, String.format("Current schema '%s' is not a file system schema. " +
- "Can't execute show files on this schema.", from.toString()));
+ WorkspaceSchema wsSchema;
+ try {
+ wsSchema = (WorkspaceSchema) drillSchema.unwrap(AbstractSchema.class).getDefaultSchema();
+ } catch (ClassCastException e) {
+ throw UserException.validationError()
+ .message("SHOW FILES is supported in workspace type schema only. Schema [%s] is not a workspace schema.",
+ SchemaUtilites.getSchemaPath(drillSchema))
+ .build();
}
+ // Get the file system object
+ fs = wsSchema.getFS();
+
+ // Get the default path
+ defaultLocation = wsSchema.getDefaultLocation();
+
List<ShowFilesCommandResult> rows = new ArrayList<>();
for (FileStatus fileStatus : fs.list(false, new Path(defaultLocation, fromDir))) {
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index 3d42f76..055b761 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RelConversionException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
import org.apache.drill.exec.store.AbstractSchema;
@@ -67,19 +69,14 @@ public class ShowTablesHandler extends DefaultSqlHandler {
// If no schema is given in SHOW TABLES command, list tables from current schema
SchemaPlus schema = context.getNewDefaultSchema();
- if (isRootSchema(schema)) {
+ if (SchemaUtilites.isRootSchema(schema)) {
// If the default schema is a root schema, throw an error to select a default schema
- throw new RelConversionException("No schema selected. Select a schema using 'USE schema' command");
- }
-
- AbstractSchema drillSchema;
-
- try {
- drillSchema = getDrillSchema(schema);
- } catch(Exception ex) {
- throw new RelConversionException("Error while rewriting SHOW TABLES query: " + ex.getMessage(), ex);
+ throw UserException.validationError()
+ .message("No default schema selected. Select a schema using 'USE schema' command")
+ .build();
}
+ final AbstractSchema drillSchema = SchemaUtilites.unwrapAsDrillSchemaInstance(schema);
tableSchema = drillSchema.getFullSchemaName();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 7ae5e0d..3edcdb2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -23,6 +23,8 @@ import org.apache.calcite.sql.TypedSqlNode;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
@@ -78,17 +80,19 @@ public class SqlHandlerUtil {
// Field count should match.
if (tableFieldNames.size() != queryRowType.getFieldCount()) {
final String tblType = isNewTableView ? "view" : "table";
- throw new ValidationException(
- String.format("%s's field list and the %s's query field list have different counts.", tblType, tblType));
+ throw UserException.validationError()
+ .message("%s's field list and the %s's query field list have different counts.", tblType, tblType)
+ .build();
}
// CTAS's query field list shouldn't have "*" when table's field list is specified.
for (String field : queryRowType.getFieldNames()) {
if (field.equals("*")) {
final String tblType = isNewTableView ? "view" : "table";
- throw new ValidationException(
- String.format("%s's query field list has a '*', which is invalid when %s's field list is specified.",
- tblType, tblType));
+ throw UserException.validationError()
+ .message("%s's query field list has a '*', which is invalid when %s's field list is specified.",
+ tblType, tblType)
+ .build();
}
}
@@ -119,12 +123,12 @@ public class SqlHandlerUtil {
}
}
- public static Table getTableFromSchema(AbstractSchema drillSchema, String tblName) throws DrillException {
+ public static Table getTableFromSchema(AbstractSchema drillSchema, String tblName) {
try {
return drillSchema.getTable(tblName);
} catch (Exception e) {
// TODO: Move to better exception types.
- throw new DrillException(
+ throw new DrillRuntimeException(
String.format("Failure while trying to check if a table or view with given name [%s] already exists " +
"in schema [%s]: %s", tblName, drillSchema.getFullSchemaName(), e.getMessage()), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
index e17e275..0ec6eb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
@@ -19,9 +19,10 @@ package org.apache.drill.exec.planner.sql.handlers;
import java.io.IOException;
+import com.google.common.base.Strings;
+
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
-
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
@@ -38,18 +39,12 @@ public class UseSchemaHandler extends AbstractSqlHandler {
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
- SqlUseSchema useSchema = unwrap(sqlNode, SqlUseSchema.class);
-
- String defaultSchema = useSchema.getSchema();
- boolean status = context.getSession().setDefaultSchemaPath(defaultSchema, context.getRootSchema());
+ final SqlUseSchema useSchema = unwrap(sqlNode, SqlUseSchema.class);
+ final String newDefaultSchemaPath = useSchema.getSchema();
- String msg;
- if (status) {
- msg = String.format("Default schema changed to '%s'", defaultSchema);
- } else {
- msg = String.format("Failed to change default schema to '%s'", defaultSchema);
- }
+ context.getSession().setDefaultSchemaPath(newDefaultSchemaPath, context.getNewDefaultSchema());
- return DirectPlan.createDirectPlan(context, status, msg);
+ return DirectPlan.createDirectPlan(context, true,
+ String.format("Default schema changed to [%s]", context.getSession().getDefaultSchemaPath()));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
index c59c3a2..0a3393e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.planner.sql.handlers;
import java.io.IOException;
-import java.util.List;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Schema.TableType;
@@ -28,20 +27,19 @@ import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.dotdrill.View;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.planner.sql.parser.SqlCreateView;
import org.apache.drill.exec.planner.sql.parser.SqlDropView;
import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.SqlNode;
-import com.google.common.collect.ImmutableList;
-
public abstract class ViewHandler extends AbstractSqlHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ViewHandler.class);
@@ -64,65 +62,46 @@ public abstract class ViewHandler extends AbstractSqlHandler {
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
SqlCreateView createView = unwrap(sqlNode, SqlCreateView.class);
- try {
- // Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below.
- final String viewSql = createView.getQuery().toString();
+ final String newViewName = createView.getName();
- final RelNode newViewRelNode =
- SqlHandlerUtil.resolveNewTableRel(true, planner, createView.getFieldNames(), createView.getQuery());
+ // Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below.
+ final String viewSql = createView.getQuery().toString();
- SchemaPlus defaultSchema = context.getNewDefaultSchema();
- SchemaPlus schema = findSchema(context.getRootSchema(), defaultSchema, createView.getSchemaPath());
- AbstractSchema drillSchema = getDrillSchema(schema);
+ final RelNode newViewRelNode =
+ SqlHandlerUtil.resolveNewTableRel(true, planner, createView.getFieldNames(), createView.getQuery());
- String schemaPath = drillSchema.getFullSchemaName();
- if (!drillSchema.isMutable()) {
- return DirectPlan.createDirectPlan(context, false, String.format("Unable to create view. " +
- "Schema [%s] is immutable. ", schemaPath));
- }
+ final SchemaPlus defaultSchema = context.getNewDefaultSchema();
+ final AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, createView.getSchemaPath());
- // find current workspace schema path
- List<String> workspaceSchemaPath = ImmutableList.of();
- if (!isRootSchema(defaultSchema)) {
- workspaceSchemaPath = getDrillSchema(defaultSchema).getSchemaPath();
- }
+ final String schemaPath = drillSchema.getFullSchemaName();
+ final View view = new View(newViewName, viewSql, newViewRelNode.getRowType(),
+ SchemaUtilites.getSchemaPathAsList(defaultSchema));
+
+ final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, newViewName);
- View view = new View(createView.getName(), viewSql, newViewRelNode.getRowType(), workspaceSchemaPath);
-
- final String viewName = view.getName();
- final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName);
-
- if (existingTable != null) {
- if (existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
- // existing table is not a view
- throw new ValidationException(
- String.format("A non-view table with given name [%s] already exists in schema [%s]",
- viewName, schemaPath));
- }
-
- if (existingTable.getJdbcTableType() == Schema.TableType.VIEW && !createView.getReplace()) {
- // existing table is a view and create view has no "REPLACE" clause
- throw new ValidationException(
- String.format("A view with given name [%s] already exists in schema [%s]",
- view.getName(), schemaPath));
- }
+ if (existingTable != null) {
+ if (existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
+ // existing table is not a view
+ throw UserException.validationError()
+ .message("A non-view table with given name [%s] already exists in schema [%s]",
+ newViewName, schemaPath)
+ .build();
}
- boolean replaced;
- if (drillSchema instanceof WorkspaceSchema) {
- replaced = ((WorkspaceSchema) drillSchema).createView(view);
- } else {
- return DirectPlan.createDirectPlan(context, false, "Schema provided was not a workspace schema.");
+ if (existingTable.getJdbcTableType() == Schema.TableType.VIEW && !createView.getReplace()) {
+ // existing table is a view and create view has no "REPLACE" clause
+ throw UserException.validationError()
+ .message("A view with given name [%s] already exists in schema [%s]",
+ newViewName, schemaPath)
+ .build();
}
+ }
- String summary = String.format("View '%s' %s successfully in '%s' schema",
- createView.getName(), replaced ? "replaced" : "created", schemaPath);
+ final boolean replaced = drillSchema.createView(view);
+ final String summary = String.format("View '%s' %s successfully in '%s' schema",
+ createView.getName(), replaced ? "replaced" : "created", schemaPath);
- return DirectPlan.createDirectPlan(context, true, summary);
- } catch(Exception e) {
- logger.error("Failed to create view '{}'", createView.getName(), e);
- return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage()));
- }
+ return DirectPlan.createDirectPlan(context, true, summary);
}
}
@@ -136,39 +115,26 @@ public abstract class ViewHandler extends AbstractSqlHandler {
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
SqlDropView dropView = unwrap(sqlNode, SqlDropView.class);
final String viewToDrop = dropView.getName();
+ final AbstractSchema drillSchema =
+ SchemaUtilites.resolveToMutableDrillSchema(context.getNewDefaultSchema(), dropView.getSchemaPath());
+
+ final String schemaPath = drillSchema.getFullSchemaName();
+
+ final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewToDrop);
+ if (existingTable != null && existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
+ throw UserException.validationError()
+ .message("[%s] is not a VIEW in schema [%s]", viewToDrop, schemaPath)
+ .build();
+ } else if (existingTable == null) {
+ throw UserException.validationError()
+ .message("Unknown view [%s] in schema [%s].", viewToDrop, schemaPath)
+ .build();
+ }
- try {
- SchemaPlus schema = findSchema(context.getRootSchema(), context.getNewDefaultSchema(), dropView.getSchemaPath());
- AbstractSchema drillSchema = getDrillSchema(schema);
-
- String schemaPath = drillSchema.getFullSchemaName();
- if (!drillSchema.isMutable()) {
- return DirectPlan.createDirectPlan(context, false, String.format("Schema [%s] is immutable.", schemaPath));
- }
-
- if (!(drillSchema instanceof WorkspaceSchema)) {
- return DirectPlan.createDirectPlan(context, false,
- String.format("Schema [%s] doesn't support creating/dropping views.", schemaPath));
- }
-
- final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewToDrop);
- if (existingTable != null && existingTable.getJdbcTableType() != Schema.TableType.VIEW) {
- return DirectPlan.createDirectPlan(context, false,
- String.format("[%s] is not a VIEW in schema [%s]", viewToDrop, schemaPath));
- } else if (existingTable == null) {
- return DirectPlan.createDirectPlan(context, false,
- String.format("Unknown view [%s] in schema [%s].", viewToDrop, schemaPath));
- }
-
- ((WorkspaceSchema) drillSchema).dropView(viewToDrop);
+ drillSchema.dropView(viewToDrop);
- return DirectPlan.createDirectPlan(context, true,
- String.format("View [%s] deleted successfully from schema [%s].", viewToDrop, schemaPath));
- } catch(Exception e) {
- logger.debug("Failed to delete view {}", viewToDrop, e);
- return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage()));
- }
+ return DirectPlan.createDirectPlan(context, true,
+ String.format("View [%s] deleted successfully from schema [%s].", viewToDrop, schemaPath));
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 9f1a695..e717eaa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -17,11 +17,17 @@
*/
package org.apache.drill.exec.rpc.user;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.calcite.schema.SchemaPlus;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.sql.SchemaUtilites;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -133,17 +139,37 @@ public class UserSession {
/**
* Update the schema path for the session.
- * @param fullPath The desired path to set to.
- * @param schema The root schema to find this path within.
- * @return true if the path was set successfully. false if this path was unavailable.
+ * @param newDefaultSchemaPath New default schema path to set. It could be relative to the current default schema or
+ * absolute schema.
+ * @param currentDefaultSchema Current default schema.
+ * @throws ValidationException If the given default schema path is invalid in current schema tree.
*/
- public boolean setDefaultSchemaPath(String fullPath, SchemaPlus schema) {
- SchemaPlus newDefault = findSchema(schema, fullPath);
+ public void setDefaultSchemaPath(String newDefaultSchemaPath, SchemaPlus currentDefaultSchema)
+ throws ValidationException {
+ final List<String> newDefaultPathAsList = Lists.newArrayList(newDefaultSchemaPath.split("\\."));
+ SchemaPlus newDefault;
+
+ // First try to find the given schema relative to the current default schema.
+ newDefault = SchemaUtilites.findSchema(currentDefaultSchema, newDefaultPathAsList);
+
if (newDefault == null) {
- return false;
+ // If we fail to find the schema relative to current default schema, consider the given new default schema path as
+ // absolute schema path.
+ newDefault = SchemaUtilites.findSchema(currentDefaultSchema, newDefaultPathAsList);
}
- setProp(SCHEMA, fullPath);
- return true;
+
+ if (newDefault == null) {
+ SchemaUtilites.throwSchemaNotFoundException(currentDefaultSchema, newDefaultSchemaPath);
+ }
+
+ setProp(SCHEMA, SchemaUtilites.getSchemaPath(newDefault));
+ }
+
+ /**
+ * @return Get current default schema path.
+ */
+ public String getDefaultSchemaPath() {
+ return getProp(SCHEMA);
}
/**
@@ -152,7 +178,20 @@ public class UserSession {
* @return A {@link org.apache.calcite.schema.SchemaPlus} object.
*/
public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) {
- return findSchema(rootSchema, getProp(SCHEMA));
+ final String defaultSchemaPath = getProp(SCHEMA);
+
+ if (Strings.isNullOrEmpty(defaultSchemaPath)) {
+ return null;
+ }
+
+ final SchemaPlus defaultSchema = SchemaUtilites.findSchema(rootSchema, defaultSchemaPath);
+
+ if (defaultSchema == null) {
+ // If the current schema resolves to null, return root schema as the current default schema.
+ return defaultSchema;
+ }
+
+ return defaultSchema;
}
public boolean setSessionOption(String name, String value) {
@@ -166,17 +205,4 @@ public class UserSession {
private void setProp(String key, String value) {
properties.put(key, value);
}
-
- private SchemaPlus findSchema(SchemaPlus rootSchema, String schemaPath) {
- String[] paths = schemaPath.split("\\.");
- SchemaPlus schema = rootSchema;
- for (String p : paths) {
- schema = schema.getSubSchema(p);
- if (schema == null) {
- break;
- }
- }
- return schema;
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 33ddea5..6afce1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -29,6 +30,8 @@ import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.dotdrill.View;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import com.google.common.base.Joiner;
@@ -88,8 +91,34 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer
return this;
}
+ /**
+ * Create a new view given definition.
+ * @param view View info including name, definition etc.
+ * @return Returns true if an existing view is replaced with the given view. False otherwise.
+ * @throws IOException
+ */
+ public boolean createView(View view) throws IOException {
+ throw UserException.unsupportedError()
+ .message("Creating new view is not supported in schema [%s]", getSchemaPath())
+ .build();
+ }
+
+ /**
+ * Drop the view with given name.
+ *
+ * @param viewName
+ * @throws IOException
+ */
+ public void dropView(String viewName) throws IOException {
+ throw UserException.unsupportedError()
+ .message("Dropping a view is supported in schema [%s]", getSchemaPath())
+ .build();
+ }
+
public CreateTableEntry createNewTable(String tableName) {
- throw new UnsupportedOperationException("New tables are not allowed in this schema");
+ throw UserException.unsupportedError()
+ .message("Creating new tables is not supported in schema [%s]", getSchemaPath())
+ .build();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 916564d..b1135d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.dfs;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -126,7 +125,8 @@ public class WorkspaceSchemaFactory {
this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf);
}
- public boolean createView(View view) throws Exception {
+ @Override
+ public boolean createView(View view) throws IOException {
Path viewPath = getViewPath(view.getName());
boolean replaced = fs.exists(viewPath);
final FsPermission viewPerms =
@@ -152,6 +152,7 @@ public class WorkspaceSchemaFactory {
return new SubDirectoryList(fileStatuses);
}
+ @Override
public void dropView(String viewName) throws IOException {
fs.delete(getViewPath(viewName), false);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 83134b3..a07f621 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -66,6 +66,8 @@ import static org.junit.Assert.assertThat;
public class BaseTestQuery extends ExecTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+ protected static final String TEMP_SCHEMA = "dfs_test.tmp";
+
private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
private static final int MAX_WIDTH_PER_NODE = 2;
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
index 411660f..122542a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationMetadata.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.impersonation;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -207,8 +208,11 @@ public class TestImpersonationMetadata extends BaseTestImpersonation {
test("USE " + viewSchema);
- test("CREATE VIEW " + viewName + " AS SELECT " +
- "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;");
+ final String query = "CREATE VIEW " + viewName + " AS SELECT " +
+ "c_custkey, c_nationkey FROM cp.`tpch/customer.parquet` ORDER BY c_custkey;";
+ final String expErrorMsg = "PERMISSION ERROR: Permission denied: user=drillTestUser2, access=WRITE, " +
+ "inode=\"/drillTestGrp0_755\"";
+ errorMsgTestHelper(query, expErrorMsg);
// SHOW TABLES is expected to return no records as view creation fails above.
testBuilder()
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 958cf1a..5f57567 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -477,52 +477,6 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
- @Test // DRILL-2422
- public void createTableWhenATableWithSameNameAlreadyExists() throws Exception{
- final String newTblName = "createTableWhenTableAlreadyExists";
-
- try {
- test("USE dfs_test.tmp");
- final String ctas = String.format("CREATE TABLE %s AS SELECT * from cp.`region.json`", newTblName);
-
- test(ctas);
-
- testBuilder()
- .unOrdered()
- .sqlQuery(ctas)
- .baselineColumns("ok", "summary")
- .baselineValues(false,
- String.format("Error: A table or view with given name [%s] already exists in schema [%s]",
- newTblName, "dfs_test.tmp"))
- .go();
- } finally {
- deleteTableIfExists(newTblName);
- }
- }
-
- @Test // DRILL-2422
- public void createTableWhenAViewWithSameNameAlreadyExists() throws Exception{
- final String newTblName = "createTableWhenAViewWithSameNameAlreadyExists";
-
- try {
- test("USE dfs_test.tmp");
- final String createView = String.format("CREATE VIEW %s AS SELECT * from cp.`region.json`", newTblName);
-
- test(createView);
-
- testBuilder()
- .unOrdered()
- .sqlQuery(String.format("CREATE TABLE %s AS SELECT * FROM cp.`employee.json`", newTblName))
- .baselineColumns("ok", "summary")
- .baselineValues(false,
- String.format("Error: A table or view with given name [%s] already exists in schema [%s]",
- newTblName, "dfs_test.tmp"))
- .go();
- } finally {
- test("DROP VIEW " + newTblName);
- }
- }
-
private static void deleteTableIfExists(String tableName) {
try {
Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
index 5fff956..c4cc37a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
@@ -17,22 +17,24 @@
*/
package org.apache.drill.exec.sql;
-import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
import org.apache.drill.BaseTestQuery;
import org.junit.Test;
+import java.io.File;
+
public class TestCTAS extends BaseTestQuery {
@Test // DRILL-2589
public void withDuplicateColumnsInDef1() throws Exception {
ctasErrorTestHelper("CREATE TABLE %s.%s AS SELECT region_id, region_id FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "region_id")
+ String.format("Duplicate column name [%s]", "region_id")
);
}
@Test // DRILL-2589
public void withDuplicateColumnsInDef2() throws Exception {
ctasErrorTestHelper("CREATE TABLE %s.%s AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "sales_city")
+ String.format("Duplicate column name [%s]", "sales_city")
);
}
@@ -41,7 +43,7 @@ public class TestCTAS extends BaseTestQuery {
ctasErrorTestHelper(
"CREATE TABLE %s.%s(regionid, regionid) " +
"AS SELECT region_id, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "regionid")
+ String.format("Duplicate column name [%s]", "regionid")
);
}
@@ -50,7 +52,7 @@ public class TestCTAS extends BaseTestQuery {
ctasErrorTestHelper(
"CREATE TABLE %s.%s(regionid, salescity, salescity) " +
"AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "salescity")
+ String.format("Duplicate column name [%s]", "salescity")
);
}
@@ -59,7 +61,7 @@ public class TestCTAS extends BaseTestQuery {
ctasErrorTestHelper(
"CREATE TABLE %s.%s(regionid, salescity, SalesCity) " +
"AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "SalesCity")
+ String.format("Duplicate column name [%s]", "SalesCity")
);
}
@@ -68,7 +70,7 @@ public class TestCTAS extends BaseTestQuery {
ctasErrorTestHelper(
"CREATE TABLE %s.%s(regionid, salescity) " +
"AS SELECT region_id, sales_city, sales_region FROM cp.`region.json`",
- "Error: table's field list and the table's query field list have different counts."
+ "table's field list and the table's query field list have different counts."
);
}
@@ -77,19 +79,47 @@ public class TestCTAS extends BaseTestQuery {
ctasErrorTestHelper(
"CREATE TABLE %s.%s(regionid, salescity) " +
"AS SELECT region_id, * FROM cp.`region.json`",
- "Error: table's query field list has a '*', which is invalid when table's field list is specified."
+ "table's query field list has a '*', which is invalid when table's field list is specified."
);
}
- private static void ctasErrorTestHelper(final String ctasSql, final String errorMsg)
- throws Exception {
- final String createTableSql = String.format(ctasSql, "dfs_test.tmp", "testTableName");
+ @Test // DRILL-2422
+ public void createTableWhenATableWithSameNameAlreadyExists() throws Exception{
+ final String newTblName = "createTableWhenTableAlreadyExists";
+
+ try {
+ final String ctasQuery =
+ String.format("CREATE TABLE %s.%s AS SELECT * from cp.`region.json`", TEMP_SCHEMA, newTblName);
+
+ test(ctasQuery);
+
+ errorMsgTestHelper(ctasQuery,
+ String.format("A table or view with given name [%s] already exists in schema [%s]", newTblName, TEMP_SCHEMA));
+ } finally {
+ FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), newTblName));
+ }
+ }
+
+ @Test // DRILL-2422
+ public void createTableWhenAViewWithSameNameAlreadyExists() throws Exception{
+ final String newTblName = "createTableWhenAViewWithSameNameAlreadyExists";
+
+ try {
+ test(String.format("CREATE VIEW %s.%s AS SELECT * from cp.`region.json`", TEMP_SCHEMA, newTblName));
+
+ final String ctasQuery =
+ String.format("CREATE TABLE %s.%s AS SELECT * FROM cp.`employee.json`", TEMP_SCHEMA, newTblName);
+
+ errorMsgTestHelper(ctasQuery,
+ String.format("A table or view with given name [%s] already exists in schema [%s]",
+ newTblName, "dfs_test.tmp"));
+ } finally {
+ test(String.format("DROP VIEW %s.%s", TEMP_SCHEMA, newTblName));
+ }
+ }
- testBuilder()
- .sqlQuery(createTableSql)
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false, errorMsg)
- .go();
+ private static void ctasErrorTestHelper(final String ctasSql, final String expErrorMsg) throws Exception {
+ final String createTableSql = String.format(ctasSql, TEMP_SCHEMA, "testTableName");
+ errorMsgTestHelper(createTableSql, expErrorMsg);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index 8bcbc7a..9a35be4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -285,18 +285,38 @@ public class TestInfoSchema extends BaseTestQuery {
.sqlQuery("USE dfs_test.`default`")
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(true, "Default schema changed to 'dfs_test.default'")
+ .baselineValues(true, "Default schema changed to [dfs_test.default]")
.go();
}
@Test
- public void useSchemaNegative() throws Exception{
+ public void useSubSchemaWithinSchema() throws Exception{
+ testBuilder()
+ .sqlQuery("USE dfs_test")
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, "Default schema changed to [dfs_test]")
+ .go();
+
testBuilder()
- .sqlQuery("USE invalid.schema")
+ .sqlQuery("USE tmp")
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(false, "Failed to change default schema to 'invalid.schema'")
+ .baselineValues(true, "Default schema changed to [dfs_test.tmp]")
.go();
+
+ testBuilder()
+ .sqlQuery("USE dfs.`default`")
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, "Default schema changed to [dfs.default]")
+ .go();
+ }
+
+ @Test
+ public void useSchemaNegative() throws Exception{
+ errorMsgTestHelper("USE invalid.schema",
+ "Schema [invalid.schema] is not valid with respect to either root schema or current default schema.");
}
// Tests using backticks around the complete schema path
http://git-wip-us.apache.org/repos/asf/drill/blob/6cc89e99/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
index 0fc1f32..e3156d0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
@@ -26,8 +26,6 @@ import java.io.File;
import java.util.List;
public class TestViewSupport extends TestBaseViewSupport {
- private static final String TEMP_SCHEMA = "dfs_test.tmp";
-
@Test
public void referToSchemaInsideAndOutsideView() throws Exception {
String use = "use dfs_test.tmp;";
@@ -273,13 +271,8 @@ public class TestViewSupport extends TestBaseViewSupport {
// Try to create the view with same name in same schema.
final String createViewSql = String.format("CREATE VIEW %s.`%s` AS %s", TEMP_SCHEMA, viewName, viewDef1);
- testBuilder()
- .sqlQuery(createViewSql)
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false,
- String.format("Error: A view with given name [%s] already exists in schema [%s]", viewName, TEMP_SCHEMA))
- .go();
+ errorMsgTestHelper(createViewSql,
+ String.format("A view with given name [%s] already exists in schema [%s]", viewName, TEMP_SCHEMA));
// Try creating the view with same name in same schema, but with CREATE OR REPLACE VIEW clause
final String viewDef2 = "SELECT sales_state_province FROM cp.`region.json` ORDER BY `region_id`";
@@ -296,7 +289,7 @@ public class TestViewSupport extends TestBaseViewSupport {
// Make sure the new view created returns the data expected.
queryViewHelper(String.format("SELECT * FROM %s.`%s` LIMIT 1", TEMP_SCHEMA, viewName),
- new String[]{"sales_state_province" },
+ new String[]{"sales_state_province"},
ImmutableList.of(new Object[]{"None"})
);
} finally {
@@ -315,25 +308,13 @@ public class TestViewSupport extends TestBaseViewSupport {
// Try to create the view with same name in same schema.
final String createViewSql = String.format("CREATE VIEW %s.`%s` AS %s", TEMP_SCHEMA, tableName, tableDef1);
- testBuilder()
- .sqlQuery(createViewSql)
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false,
- String.format("Error: A non-view table with given name [%s] already exists in schema [%s]",
- tableName, TEMP_SCHEMA))
- .go();
+ errorMsgTestHelper(createViewSql,
+ String.format("A non-view table with given name [%s] already exists in schema [%s]", tableName, TEMP_SCHEMA));
// Try creating the view with same name in same schema, but with CREATE OR REPLACE VIEW clause
final String viewDef2 = "SELECT sales_state_province FROM cp.`region.json` ORDER BY `region_id`";
- testBuilder()
- .sqlQuery(String.format("CREATE OR REPLACE VIEW %s.`%s` AS %s", TEMP_SCHEMA, tableName, viewDef2))
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false,
- String.format("Error: A non-view table with given name [%s] already exists in schema [%s]",
- tableName, TEMP_SCHEMA))
- .go();
+ errorMsgTestHelper(String.format("CREATE OR REPLACE VIEW %s.`%s` AS %s", TEMP_SCHEMA, tableName, viewDef2),
+ String.format("A non-view table with given name [%s] already exists in schema [%s]", tableName, TEMP_SCHEMA));
} finally {
FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), tableName));
}
@@ -445,7 +426,7 @@ public class TestViewSupport extends TestBaseViewSupport {
test("USE dfs_test");
queryViewHelper(
String.format("SELECT * FROM %s.`%s` ORDER BY region_id DESC LIMIT 1", "tmp", viewName),
- baselineColumns,baselineValues);
+ baselineColumns, baselineValues);
} finally {
dropViewHelper(TEMP_SCHEMA, viewName, TEMP_SCHEMA);
@@ -485,14 +466,14 @@ public class TestViewSupport extends TestBaseViewSupport {
public void createViewWithDuplicateColumnsInDef1() throws Exception {
createViewErrorTestHelper(
"CREATE VIEW %s.%s AS SELECT region_id, region_id FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "region_id")
+ String.format("Duplicate column name [%s]", "region_id")
);
}
@Test // DRILL-2589
public void createViewWithDuplicateColumnsInDef2() throws Exception {
createViewErrorTestHelper("CREATE VIEW %s.%s AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "sales_city")
+ String.format("Duplicate column name [%s]", "sales_city")
);
}
@@ -500,7 +481,7 @@ public class TestViewSupport extends TestBaseViewSupport {
public void createViewWithDuplicateColumnsInDef3() throws Exception {
createViewErrorTestHelper(
"CREATE VIEW %s.%s(regionid, regionid) AS SELECT region_id, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "regionid")
+ String.format("Duplicate column name [%s]", "regionid")
);
}
@@ -509,7 +490,7 @@ public class TestViewSupport extends TestBaseViewSupport {
createViewErrorTestHelper(
"CREATE VIEW %s.%s(regionid, salescity, salescity) " +
"AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "salescity")
+ String.format("Duplicate column name [%s]", "salescity")
);
}
@@ -518,7 +499,7 @@ public class TestViewSupport extends TestBaseViewSupport {
createViewErrorTestHelper(
"CREATE VIEW %s.%s(regionid, salescity, SalesCity) " +
"AS SELECT region_id, sales_city, sales_city FROM cp.`region.json`",
- String.format("Error: Duplicate column name [%s]", "SalesCity")
+ String.format("Duplicate column name [%s]", "SalesCity")
);
}
@@ -528,7 +509,7 @@ public class TestViewSupport extends TestBaseViewSupport {
"CREATE VIEW %s.%s " +
"AS SELECT t1.region_id, t2.region_id FROM cp.`region.json` t1 JOIN cp.`region.json` t2 " +
"ON t1.region_id = t2.region_id LIMIT 1",
- String.format("Error: Duplicate column name [%s]", "region_id")
+ String.format("Duplicate column name [%s]", "region_id")
);
}
@@ -566,7 +547,7 @@ public class TestViewSupport extends TestBaseViewSupport {
createViewErrorTestHelper(
"CREATE VIEW %s.%s(regionid, salescity) " +
"AS SELECT region_id, sales_city, sales_region FROM cp.`region.json`",
- "Error: view's field list and the view's query field list have different counts."
+ "view's field list and the view's query field list have different counts."
);
}
@@ -575,40 +556,25 @@ public class TestViewSupport extends TestBaseViewSupport {
createViewErrorTestHelper(
"CREATE VIEW %s.%s(regionid, salescity) " +
"AS SELECT region_id, * FROM cp.`region.json`",
- "Error: view's query field list has a '*', which is invalid when view's field list is specified."
+ "view's query field list has a '*', which is invalid when view's field list is specified."
);
}
- private static void createViewErrorTestHelper(final String viewSql, final String errorMsg)
- throws Exception {
+ private static void createViewErrorTestHelper(final String viewSql, final String expErrorMsg) throws Exception {
final String createViewSql = String.format(viewSql, TEMP_SCHEMA, "duplicateColumnsInViewDef");
-
- testBuilder()
- .sqlQuery(createViewSql)
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false, errorMsg)
- .go();
+ errorMsgTestHelper(createViewSql, expErrorMsg);
}
@Test // DRILL-2423
public void showProperMsgWhenDroppingNonExistentView() throws Exception{
- testBuilder()
- .sqlQuery("DROP VIEW dfs_test.tmp.nonExistentView")
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false, "Unknown view [nonExistentView] in schema [dfs_test.tmp].")
- .go();
+ errorMsgTestHelper("DROP VIEW dfs_test.tmp.nonExistentView",
+ "Unknown view [nonExistentView] in schema [dfs_test.tmp].");
}
@Test // DRILL-2423
public void showProperMsgWhenTryingToDropAViewInImmutableSchema() throws Exception{
- testBuilder()
- .sqlQuery("DROP VIEW cp.nonExistentView")
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false, "Schema [cp.default] is immutable.")
- .go();
+ errorMsgTestHelper("DROP VIEW cp.nonExistentView",
+ "Unable to create or drop tables/views. Schema [cp] is immutable.");
}
@Test // DRILL-2423
@@ -618,12 +584,8 @@ public class TestViewSupport extends TestBaseViewSupport {
test(String.format("CREATE TABLE %s.%s AS SELECT c_custkey, c_nationkey from cp.`tpch/customer.parquet`",
TEMP_SCHEMA, testTableName));
- testBuilder()
- .sqlQuery(String.format("DROP VIEW %s.%s", TEMP_SCHEMA, testTableName))
- .unOrdered()
- .baselineColumns("ok", "summary")
- .baselineValues(false, "[testTableShowErrorMsg] is not a VIEW in schema [dfs_test.tmp]")
- .go();
+ errorMsgTestHelper(String.format("DROP VIEW %s.%s", TEMP_SCHEMA, testTableName),
+ "[testTableShowErrorMsg] is not a VIEW in schema [dfs_test.tmp]");
} finally {
File tblPath = new File(getDfsTestTmpSchemaLocation(), testTableName);
FileUtils.deleteQuietly(tblPath);
[4/4] drill git commit: DRILL-2977,
DRILL-2978: Swap fragment execution method implementations,
and cancellation changes
Posted by ve...@apache.org.
DRILL-2977, DRILL-2978: Swap fragment execution method implementations, and cancellation changes
Execution: In WorkManager,
+ swap implementations of startFragmentPendingRemote() and addFragmentRunner()
+ warn if there are running fragments in close()
Cancellation:
+ for fragments waiting on data, delegate cancellations to WorkEventBus (in Foreman and ControlMessageHandler)
+ documentation
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8d577836
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8d577836
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8d577836
Branch: refs/heads/master
Commit: 8d57783623a4ed1691dcebf7b059ad0fd70d8ad7
Parents: eb79e80
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Mon May 11 12:02:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700
----------------------------------------------------------------------
.../drill/exec/rpc/control/WorkEventBus.java | 55 +++++++++++++++++---
.../org/apache/drill/exec/work/WorkManager.java | 45 +++++++++++++---
.../exec/work/batch/ControlMessageHandler.java | 23 +++++---
.../drill/exec/work/foreman/QueryManager.java | 21 ++++++--
.../exec/work/fragment/RootFragmentManager.java | 1 +
5 files changed, 121 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index d90096a..ddd7828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -84,11 +84,13 @@ public class WorkEventBus {
}
public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
- return managers.get(handle);
+ synchronized (this) {
+ return managers.get(handle);
+ }
}
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- // check if this was a recently canceled fragment. If so, throw away message.
+ // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
if (logger.isDebugEnabled()) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
@@ -97,19 +99,58 @@ public class WorkEventBus {
}
// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if(m != null) {
- return m;
+ synchronized (this) {
+ final FragmentManager m = managers.get(handle);
+ if (m != null) {
+ return m;
+ }
}
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
+ QueryIdHelper.getQueryIdentifier(handle));
}
+ /**
+ * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
+ * multiple times. The manager will be removed only once (the first call).
+ * @param handle the handle to the fragment
+ */
public void removeFragmentManager(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
+
+ synchronized (this) {
+ final FragmentManager manager = managers.get(handle);
+ if (manager != null) {
+ recentlyFinishedFragments.put(handle, 1);
+ managers.remove(handle);
+ } else {
+ logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
+ }
+ }
+ }
+
+ /**
+ * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
+ * for fragments waiting on data (root and intermediate).
+ * @param handle the handle to the fragment
+ * @return if the fragment was found and removed from the event bus
+ */
+ public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+ }
+
+ synchronized (this) {
+ final FragmentManager manager = managers.get(handle);
+ if (manager == null) {
+ return false;
+ }
+
+ manager.cancel();
+ recentlyFinishedFragments.put(handle, 1);
+ managers.remove(handle);
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1d3a0b0..5939113 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.RpcException;
@@ -178,6 +179,16 @@ public class WorkManager implements AutoCloseable {
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
+
+ if (!runningFragments.isEmpty()) {
+ logger.warn("Closing WorkManager but there are {} running fragments.", runningFragments.size());
+ if (logger.isDebugEnabled()) {
+ for (final FragmentHandle handle : runningFragments.keySet()) {
+ logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(handle),
+ runningFragments.get(handle).getStatus());
+ }
+ }
+ }
}
public DrillbitContext getContext() {
@@ -261,14 +272,10 @@ public class WorkManager implements AutoCloseable {
return dContext;
}
- public void startFragmentPendingRemote(final FragmentManager handler) {
- final FragmentExecutor fragmentExecutor = handler.getRunnable();
- // cancelled fragment managers will return null fragment executors
- if (fragmentExecutor != null) {
- executor.execute(fragmentExecutor);
- }
- }
-
+ /**
+ * Currently used to start a root fragment that is not blocked on data, and leaf fragments.
+ * @param fragmentExecutor the executor to run
+ */
public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
runningFragments.put(fragmentHandle, fragmentExecutor);
@@ -276,6 +283,28 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
+ indicateIfSafeToExit();
+ }
+ });
+ }
+
+ /**
+ * Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
+ * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataResponseHandlerImpl#handle}
+ * @param fragmentManager the manager for the fragment
+ */
+ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
+ final FragmentHandle fragmentHandle = fragmentManager.getHandle();
+ final FragmentExecutor fragmentExecutor = fragmentManager.getRunnable();
+ if (fragmentExecutor == null) {
+ // the fragment was most likely cancelled
+ return;
+ }
+ runningFragments.put(fragmentHandle, fragmentExecutor);
+ executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
+ @Override
+ protected void cleanup() {
+ runningFragments.remove(fragmentHandle);
workBus.removeFragmentManager(fragmentHandle);
indicateIfSafeToExit();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index d12e6d5..421ad7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -159,21 +159,32 @@ public class ControlMessageHandler {
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
private Ack cancelFragment(final FragmentHandle handle) {
- // cancel a pending fragment
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
- if (manager != null) {
- manager.cancel();
+ /**
+ * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
+ * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and
+ * "inactive" refers to FINISHED, CANCELLATION_REQUESTED, CANCELLED, FAILED
+ */
+
+ // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
+ // work bus.
+ final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ if (removed) {
return Acks.OK;
}
- // cancel a running fragment
+ // Case 3: Cancel active leaf fragment. Such a fragment will be with the worker bee if and only if it is running.
+ // Cancel directly in this case.
final FragmentExecutor runner = bee.getFragmentRunner(handle);
if (runner != null) {
runner.cancel();
return Acks.OK;
}
- // fragment completed or does not exist
+ // Other cases: Fragment completed or does not exist. Currently known cases:
+ // (1) Leaf or intermediate fragment that is inactive: although we should not receive a cancellation
+ // request; it is possible that before the fragment state was updated in the QueryManager, this handler
+ // received a cancel signal.
+ // (2) Unknown fragment.
logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
return Acks.OK;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 090a377..84a38a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
@@ -174,7 +175,16 @@ public class QueryManager {
}
/**
- * Stop all fragments with a currently active status.
+ * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
+ * (1) Root fragment
+ * (a) If the root is pending, delegate the cancellation to local work bus.
+ * (b) If the root is running, cancel the fragment directly.
+ *
+ * For the actual cancel calls for intermediate and leaf fragments, see
+ * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+ * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
+ * fragments). The actual cancel is done by delegating the cancel to the work bus.
+ * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
final Controller controller = drillbitContext.getController();
@@ -183,11 +193,16 @@ public class QueryManager {
case SENDING:
case AWAITING_ALLOCATION:
case RUNNING:
- if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
+ final FragmentHandle handle = data.getHandle();
+ if (rootRunner.getContext().getHandle().equals(handle)) {
+ // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
+ final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ // Case 1.b: running root. Cancel directly.
+ if (!removed) {
rootRunner.cancel();
+ }
} else {
final DrillbitEndpoint endpoint = data.getEndpoint();
- final FragmentHandle handle = data.getHandle();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.CANCEL), handle);
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 67ef9b8..b770a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -67,6 +67,7 @@ public class RootFragmentManager implements FragmentManager {
@Override
public void cancel() {
cancel = true;
+ runner.cancel();
}
@Override