You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/06 00:31:32 UTC
svn commit: r1510788 [8/10] - in /hive/branches/tez: ./
cli/src/java/org/apache/hadoop/hive/cli/
cli/src/test/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/
eclipse-templates/ hcatalog/storage-handlers/hbase/...
Modified: hive/branches/tez/ql/src/test/results/clientpositive/multi_join_union.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/multi_join_union.q.out?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/multi_join_union.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/multi_join_union.q.out Mon Aug 5 22:31:28 2013
@@ -41,7 +41,7 @@ ABSTRACT SYNTAX TREE:
STAGE DEPENDENCIES:
Stage-8 is a root stage
- Stage-7 depends on stages: Stage-8
+ Stage-6 depends on stages: Stage-8
Stage-0 is a root stage
STAGE PLANS:
@@ -125,7 +125,7 @@ STAGE PLANS:
1 [Column[_col1]]
Position of Big Table: 0
- Stage: Stage-7
+ Stage: Stage-6
Map Reduce
Alias -> Map Operator Tree:
b
Modified: hive/branches/tez/ql/src/test/results/clientpositive/nonblock_op_deduplicate.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/nonblock_op_deduplicate.q.out?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/nonblock_op_deduplicate.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/nonblock_op_deduplicate.q.out Mon Aug 5 22:31:28 2013
@@ -42,3 +42,362 @@ STAGE PLANS:
limit: -1
+PREHOOK: query: -- This test query is introduced for HIVE-4968.
+-- First, we do not convert the join to MapJoin.
+EXPLAIN
+SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+PREHOOK: type: QUERY
+POSTHOOK: query: -- This test query is introduced for HIVE-4968.
+-- First, we do not convert the join to MapJoin.
+EXPLAIN
+SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))))) tmp1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) tmp2) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count) count)))) tmp3))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp2) key) key) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp2) value) value) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp3) count) count)))) tmp4)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp4) key) key) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tm
p4) value) value) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp4) count) count))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ tmp4:tmp3:src1
+ TableScan
+ alias: src1
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Reduce Output Operator
+ sort order:
+ tag: 1
+ value expressions:
+ expr: _col0
+ type: bigint
+ tmp4:tmp2:tmp1:src1
+ TableScan
+ alias: src1
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ sort order:
+ tag: 0
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1 {VALUE._col0}
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ expr: _col2
+ type: bigint
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1
+#### A masked pattern was here ####
+238 val_238 25
+ 25
+311 val_311 25
+ val_27 25
+ val_165 25
+ val_409 25
+255 val_255 25
+278 val_278 25
+98 val_98 25
+ val_484 25
+ val_265 25
+ val_193 25
+401 val_401 25
+150 val_150 25
+273 val_273 25
+224 25
+369 25
+66 val_66 25
+128 25
+213 val_213 25
+146 val_146 25
+406 val_406 25
+ 25
+ 25
+ 25
+PREHOOK: query: -- Then, we convert the join to MapJoin.
+EXPLAIN
+SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Then, we convert the join to MapJoin.
+EXPLAIN
+SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))))) tmp1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) tmp2) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count) count)))) tmp3))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp2) key) key) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp2) value) value) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp3) count) count)))) tmp4)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp4) key) key) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tm
p4) value) value) (TOK_SELEXPR (. (TOK_TABLE_OR_COL tmp4) count) count))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-5 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-5
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ tmp4:tmp3:src1
+ TableScan
+ alias: src1
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-5
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ tmp4:tmp2:tmp1:src1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ tmp4:tmp2:tmp1:src1
+ TableScan
+ alias: src1
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ HashTable Sink Operator
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1 {_col0}
+ handleSkewJoin: false
+ keys:
+ 0 []
+ 1 []
+ Position of Big Table: 1
+
+ Stage: Stage-4
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1 {_col0}
+ handleSkewJoin: false
+ keys:
+ 0 []
+ 1 []
+ outputColumnNames: _col0, _col1, _col2
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ expr: _col2
+ type: bigint
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT tmp4.key as key, tmp4.value as value, tmp4.count as count
+FROM (SELECT tmp2.key as key, tmp2.value as value, tmp3.count as count
+ FROM (SELECT *
+ FROM (SELECT key, value
+ FROM src1) tmp1 ) tmp2
+ JOIN (SELECT count(*) as count
+ FROM src1) tmp3
+ ) tmp4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src1
+#### A masked pattern was here ####
+238 val_238 25
+ 25
+311 val_311 25
+ val_27 25
+ val_165 25
+ val_409 25
+255 val_255 25
+278 val_278 25
+98 val_98 25
+ val_484 25
+ val_265 25
+ val_193 25
+401 val_401 25
+150 val_150 25
+273 val_273 25
+224 25
+369 25
+66 val_66 25
+128 25
+213 val_213 25
+146 val_146 25
+406 val_406 25
+ 25
+ 25
+ 25
Modified: hive/branches/tez/ql/src/test/results/clientpositive/serde_user_properties.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/serde_user_properties.q.out?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/serde_user_properties.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/serde_user_properties.q.out Mon Aug 5 22:31:28 2013
@@ -44,7 +44,6 @@ STAGE PLANS:
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -140,7 +139,6 @@ STAGE PLANS:
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -240,7 +238,6 @@ STAGE PLANS:
a
percentage: 1.0
seed number: 0
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -336,7 +333,6 @@ STAGE PLANS:
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -438,7 +434,6 @@ STAGE PLANS:
src
percentage: 1.0
seed number: 0
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -493,7 +488,6 @@ STAGE PLANS:
Fetch Operator
limit: -1
-
PREHOOK: query: explain extended select a.key from src ('user.defined.key'='some.value') a
PREHOOK: type: QUERY
POSTHOOK: query: explain extended select a.key from src ('user.defined.key'='some.value') a
@@ -536,7 +530,6 @@ STAGE PLANS:
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -638,7 +631,6 @@ STAGE PLANS:
a
percentage: 1.0
seed number: 0
- Needs Tagging: false
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
@@ -693,4 +685,3 @@ STAGE PLANS:
Fetch Operator
limit: -1
-
Modified: hive/branches/tez/ql/src/test/results/clientpositive/union34.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/union34.q.out?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/union34.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/union34.q.out Mon Aug 5 22:31:28 2013
@@ -1,22 +1,6 @@
-PREHOOK: query: -- HIVE-4342
--- Maponly union(UNION-13) is merged into non-maponly union(UNION-15)
--- In this case, task for UNION-13 should be removed from top-task and merged into task for UNION-15
--- TS[2]-SEL[3]-RS[5]-JOIN[6]-SEL[7]-UNION[15]-SEL[16]-RS[17]-EX[18]-FS[19]
--- TS[0]-SEL[1]-RS[4]-JOIN[6]
--- TS[8]-SEL[9]-UNION[13]-SEL[14]-UNION[15]
--- TS[11]-SEL[12]-UNION[13]
-
-create table src10_1 (key string, value string)
+PREHOOK: query: create table src10_1 (key string, value string)
PREHOOK: type: CREATETABLE
-POSTHOOK: query: -- HIVE-4342
--- Maponly union(UNION-13) is merged into non-maponly union(UNION-15)
--- In this case, task for UNION-13 should be removed from top-task and merged into task for UNION-15
--- TS[2]-SEL[3]-RS[5]-JOIN[6]-SEL[7]-UNION[15]-SEL[16]-RS[17]-EX[18]-FS[19]
--- TS[0]-SEL[1]-RS[4]-JOIN[6]
--- TS[8]-SEL[9]-UNION[13]-SEL[14]-UNION[15]
--- TS[11]-SEL[12]-UNION[13]
-
-create table src10_1 (key string, value string)
+POSTHOOK: query: create table src10_1 (key string, value string)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: default@src10_1
PREHOOK: query: create table src10_2 (key string, value string)
@@ -64,14 +48,18 @@ POSTHOOK: Lineage: src10_3.key SIMPLE [(
POSTHOOK: Lineage: src10_3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
POSTHOOK: Lineage: src10_4.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: Lineage: src10_4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
-PREHOOK: query: explain
+PREHOOK: query: -- When we convert the Join of sub1 and sub0 into a MapJoin,
+-- we can use a single MR job to evaluate this entire query.
+explain
SELECT * FROM (
SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)
UNION ALL
SELECT key,value FROM (SELECT * FROM (SELECT * FROM src10_3) sub2 UNION ALL SELECT * FROM src10_4 ) alias0
) alias1 order by key
PREHOOK: type: QUERY
-POSTHOOK: query: explain
+POSTHOOK: query: -- When we convert the Join of sub1 and sub0 into a MapJoin,
+-- we can use a single MR job to evaluate this entire query.
+explain
SELECT * FROM (
SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)
UNION ALL
@@ -91,8 +79,7 @@ ABSTRACT SYNTAX TREE:
STAGE DEPENDENCIES:
Stage-7 is a root stage
- Stage-6 depends on stages: Stage-7
- Stage-2 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-7
Stage-0 is a root stage
STAGE PLANS:
@@ -123,7 +110,7 @@ STAGE PLANS:
1 [Column[_col0]]
Position of Big Table: 1
- Stage: Stage-6
+ Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
null-subquery1:alias1-subquery1:sub0:src10_2
@@ -153,39 +140,25 @@ STAGE PLANS:
expr: _col1
type: string
outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- Local Work:
- Map Reduce Local Work
-
- Stage: Stage-2
- Map Reduce
- Alias -> Map Operator Tree:
-#### A masked pattern was here ####
- TableScan
- Union
- Select Operator
- expressions:
- expr: _col0
- type: string
- expr: _col1
- type: string
- outputColumnNames: _col0, _col1
- Reduce Output Operator
- key expressions:
- expr: _col0
- type: string
- sort order: +
- tag: -1
- value expressions:
- expr: _col0
- type: string
- expr: _col1
- type: string
+ Union
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
null-subquery2:alias1-subquery2-subquery1:alias0-subquery1:sub2:src10_3
TableScan
alias: src10_3
@@ -260,6 +233,8 @@ STAGE PLANS:
type: string
expr: _col1
type: string
+ Local Work:
+ Map Reduce Local Work
Reduce Operator Tree:
Extract
File Output Operator
@@ -334,14 +309,22 @@ POSTHOOK: Lineage: src10_4.value SIMPLE
98 val_98
98 val_98
98 val_98
-PREHOOK: query: explain
+PREHOOK: query: -- When we do not convert the Join of sub1 and sub0 into a MapJoin,
+-- we need to use two MR jobs to evaluate this query.
+-- The first job is for the Join of sub1 and sub2. The second job
+-- is for the UNION ALL and ORDER BY.
+explain
SELECT * FROM (
SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)
UNION ALL
SELECT key,value FROM (SELECT * FROM (SELECT * FROM src10_3) sub2 UNION ALL SELECT * FROM src10_4 ) alias0
) alias1 order by key
PREHOOK: type: QUERY
-POSTHOOK: query: explain
+POSTHOOK: query: -- When we do not convert the Join of sub1 and sub0 into a MapJoin,
+-- we need to use two MR jobs to evaluate this query.
+-- The first job is for the Join of sub1 and sub2. The second job
+-- is for the UNION ALL and ORDER BY.
+explain
SELECT * FROM (
SELECT sub1.key,sub1.value FROM (SELECT * FROM src10_1) sub1 JOIN (SELECT * FROM src10_2) sub0 ON (sub0.key = sub1.key)
UNION ALL
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Mon Aug 5 22:31:28 2013
@@ -17,8 +17,19 @@
*/
package org.apache.hadoop.hive.serde2.avro;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -32,21 +43,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
class AvroDeserializer {
private static final Log LOG = LogFactory.getLog(AvroDeserializer.class);
/**
@@ -62,7 +64,7 @@ class AvroDeserializer {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
private BinaryDecoder binaryDecoder = null;
- private InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>> gdrCache
+ private final InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>> gdrCache
= new InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>>() {
@Override
protected GenericDatumReader<GenericRecord> makeInstance(ReaderWriterSchemaPair hv) {
@@ -112,13 +114,15 @@ class AvroDeserializer {
*/
public Object deserialize(List<String> columnNames, List<TypeInfo> columnTypes,
Writable writable, Schema readerSchema) throws AvroSerdeException {
- if(!(writable instanceof AvroGenericRecordWritable))
+ if(!(writable instanceof AvroGenericRecordWritable)) {
throw new AvroSerdeException("Expecting a AvroGenericRecordWritable");
+ }
- if(row == null || row.size() != columnNames.size())
+ if(row == null || row.size() != columnNames.size()) {
row = new ArrayList<Object>(columnNames.size());
- else
+ } else {
row.clear();
+ }
AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
GenericRecord r = recordWritable.getRecord();
@@ -127,7 +131,9 @@ class AvroDeserializer {
if(!r.getSchema().equals(readerSchema)) {
LOG.warn("Received different schemas. Have to re-encode: " +
r.getSchema().toString(false));
- if(reEncoder == null) reEncoder = new SchemaReEncoder();
+ if(reEncoder == null) {
+ reEncoder = new SchemaReEncoder();
+ }
r = reEncoder.reencode(r, readerSchema);
}
@@ -156,25 +162,49 @@ class AvroDeserializer {
// Klaxon! Klaxon! Klaxon!
// Avro requires NULLable types to be defined as unions of some type T
// and NULL. This is annoying and we're going to hide it from the user.
- if(AvroSerdeUtils.isNullableType(recordSchema))
+ if(AvroSerdeUtils.isNullableType(recordSchema)) {
return deserializeNullableUnion(datum, recordSchema, columnType);
+ }
- if(columnType == TypeInfoFactory.stringTypeInfo)
- return datum.toString(); // To workaround AvroUTF8
- // This also gets us around the Enum issue since we just take the value
- // and convert it to a string. Yay!
switch(columnType.getCategory()) {
case STRUCT:
return deserializeStruct((GenericData.Record) datum, (StructTypeInfo) columnType);
- case UNION:
+ case UNION:
return deserializeUnion(datum, recordSchema, (UnionTypeInfo) columnType);
case LIST:
return deserializeList(datum, recordSchema, (ListTypeInfo) columnType);
case MAP:
return deserializeMap(datum, recordSchema, (MapTypeInfo) columnType);
+ case PRIMITIVE:
+ return deserializePrimitive(datum, recordSchema, (PrimitiveTypeInfo) columnType);
default:
- return datum; // Simple type.
+ throw new AvroSerdeException("Unknown TypeInfo: " + columnType.getCategory());
+ }
+ }
+
+ private Object deserializePrimitive(Object datum, Schema recordSchema,
+ PrimitiveTypeInfo columnType) throws AvroSerdeException {
+ switch (columnType.getPrimitiveCategory()){
+ case STRING:
+ return datum.toString(); // To workaround AvroUTF8
+ // This also gets us around the Enum issue since we just take the value
+ // and convert it to a string. Yay!
+ case BINARY:
+ if (recordSchema.getType() == Type.FIXED){
+ Fixed fixed = (Fixed) datum;
+ return fixed.bytes();
+ } else if (recordSchema.getType() == Type.BYTES){
+ ByteBuffer bb = (ByteBuffer) datum;
+ bb.rewind();
+ byte[] result = new byte[bb.limit()];
+ bb.get(result);
+ return result;
+ } else {
+ throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType());
+ }
+ default:
+ return datum;
}
}
@@ -186,8 +216,9 @@ class AvroDeserializer {
TypeInfo columnType) throws AvroSerdeException {
int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
Schema schema = recordSchema.getTypes().get(tag);
- if(schema.getType().equals(Schema.Type.NULL))
+ if(schema.getType().equals(Schema.Type.NULL)) {
return null;
+ }
return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema));
}
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java Mon Aug 5 22:31:28 2013
@@ -18,9 +18,17 @@
package org.apache.hadoop.hive.serde2.avro;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,15 +46,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.avro.Schema.Type.BYTES;
-import static org.apache.avro.Schema.Type.FIXED;
-
class AvroSerializer {
private static final Log LOG = LogFactory.getLog(AvroSerializer.class);
@@ -67,12 +66,14 @@ class AvroSerializer {
GenericData.Record record = new GenericData.Record(schema);
List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
- if(outputFieldRefs.size() != columnNames.size())
+ if(outputFieldRefs.size() != columnNames.size()) {
throw new AvroSerdeException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+ }
int size = schema.getFields().size();
- if(outputFieldRefs.size() != size) // Hive does this check for us, so we should be ok.
+ if(outputFieldRefs.size() != size) {
throw new AvroSerdeException("Hive passed in a different number of fields than the schema expected: (Hive wanted " + outputFieldRefs.size() +", Avro expected " + schema.getFields().size());
+ }
List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
@@ -88,8 +89,9 @@ class AvroSerializer {
record.put(field.name(), val);
}
- if(!GenericData.get().validate(schema, record))
+ if(!GenericData.get().validate(schema, record)) {
throw new SerializeToAvroException(schema, record);
+ }
cache.setRecord(record);
@@ -111,7 +113,7 @@ class AvroSerializer {
switch(typeInfo.getCategory()) {
case PRIMITIVE:
assert fieldOI instanceof PrimitiveObjectInspector;
- return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData);
+ return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData, schema);
case MAP:
assert fieldOI instanceof MapObjectInspector;
assert typeInfo instanceof MapTypeInfo;
@@ -153,7 +155,7 @@ class AvroSerializer {
};
private Object serializeEnum(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
- return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData));
+ return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema));
}
private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws AvroSerdeException {
@@ -176,14 +178,24 @@ class AvroSerializer {
return record;
}
- private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
+ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
switch(fieldOI.getPrimitiveCategory()) {
- case UNKNOWN:
- throw new AvroSerdeException("Received UNKNOWN primitive category.");
- case VOID:
- return null;
- default: // All other primitive types are simple
- return fieldOI.getPrimitiveJavaObject(structFieldData);
+ case BINARY:
+ if (schema.getType() == Type.BYTES){
+ ByteBuffer bb = ByteBuffer.wrap((byte[])fieldOI.getPrimitiveJavaObject(structFieldData));
+ return bb.rewind();
+ } else if (schema.getType() == Type.FIXED){
+ Fixed fixed = new GenericData.Fixed(schema, (byte[])fieldOI.getPrimitiveJavaObject(structFieldData));
+ return fixed;
+ } else {
+ throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + schema.getType());
+ }
+ case UNKNOWN:
+ throw new AvroSerdeException("Received UNKNOWN primitive category.");
+ case VOID:
+ return null;
+ default: // All other primitive types are simple
+ return fieldOI.getPrimitiveJavaObject(structFieldData);
}
}
@@ -197,53 +209,7 @@ class AvroSerializer {
schema.getTypes().get(tag));
}
- // We treat FIXED and BYTES as arrays of tinyints within Hive. Check
- // if we're dealing with either of these types and thus need to serialize
- // them as their Avro types.
- private boolean isTransformedType(Schema schema) {
- return schema.getType().equals(FIXED) || schema.getType().equals(BYTES);
- }
-
- private Object serializeTransformedType(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false));
- }
- if(schema.getType().equals(FIXED)) return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema);
- else return serializeAvroBytes(typeInfo, fieldOI, structFieldData, schema);
-
- }
-
- private Object serializeAvroBytes(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
- ByteBuffer bb = ByteBuffer.wrap(extraByteArray(fieldOI, structFieldData));
- return bb.rewind();
- }
-
- private Object serializedAvroFixed(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
- return new GenericData.Fixed(schema, extraByteArray(fieldOI, structFieldData));
- }
-
- // For transforming to BYTES and FIXED, pull out the byte array Avro will want
- private byte[] extraByteArray(ListObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
- // Grab a book. This is going to be slow.
- int listLength = fieldOI.getListLength(structFieldData);
- byte[] bytes = new byte[listLength];
- assert fieldOI.getListElementObjectInspector() instanceof PrimitiveObjectInspector;
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector)fieldOI.getListElementObjectInspector();
- List<?> list = fieldOI.getList(structFieldData);
-
- for(int i = 0; i < listLength; i++) {
- Object b = poi.getPrimitiveJavaObject(list.get(i));
- if(!(b instanceof Byte))
- throw new AvroSerdeException("Attempting to transform to bytes, element was not byte but " + b.getClass().getCanonicalName());
- bytes[i] = (Byte)b;
- }
- return bytes;
- }
-
private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
- if(isTransformedType(schema))
- return serializeTransformedType(typeInfo, fieldOI, structFieldData, schema);
-
List<?> list = fieldOI.getList(structFieldData);
List<Object> deserialized = new ArrayList<Object>(list.size());
@@ -260,8 +226,9 @@ class AvroSerializer {
private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
// Avro only allows maps with string keys
- if(!mapHasStringKey(fieldOI.getMapKeyObjectInspector()))
+ if(!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) {
throw new AvroSerdeException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString());
+ }
ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector();
ObjectInspector mapValueObjectInspector = fieldOI.getMapValueObjectInspector();
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java Mon Aug 5 22:31:28 2013
@@ -17,24 +17,26 @@
*/
package org.apache.hadoop.hive.serde2.avro;
-import org.apache.avro.Schema;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.BYTES;
import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FIXED;
import static org.apache.avro.Schema.Type.FLOAT;
import static org.apache.avro.Schema.Type.INT;
import static org.apache.avro.Schema.Type.LONG;
import static org.apache.avro.Schema.Type.NULL;
import static org.apache.avro.Schema.Type.STRING;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
/**
* Convert an Avro Schema to a Hive TypeInfo
*/
@@ -47,7 +49,8 @@ class SchemaToTypeInfo {
// long bigint check
// float double check
// double double check
- // bytes
+ // bytes binary check
+ // fixed binary check
// string string check
// tinyint
// smallint
@@ -56,13 +59,15 @@ class SchemaToTypeInfo {
private static final Map<Schema.Type, TypeInfo> primitiveTypeToTypeInfo = initTypeMap();
private static Map<Schema.Type, TypeInfo> initTypeMap() {
Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
- theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
- theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+ theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
- theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+ theMap.put(BYTES, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+ theMap.put(FIXED, TypeInfoFactory.getPrimitiveTypeInfo("binary"));
+ theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
return Collections.unmodifiableMap(theMap);
}
@@ -106,22 +111,22 @@ class SchemaToTypeInfo {
private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException {
// Avro requires NULLable types to be defined as unions of some type T
// and NULL. This is annoying and we're going to hide it from the user.
- if(AvroSerdeUtils.isNullableType(schema))
+ if(AvroSerdeUtils.isNullableType(schema)) {
return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema));
+ }
Schema.Type type = schema.getType();
- if(primitiveTypeToTypeInfo.containsKey(type))
+ if(primitiveTypeToTypeInfo.containsKey(type)) {
return primitiveTypeToTypeInfo.get(type);
+ }
switch(type) {
- case BYTES: return generateBytesTypeInfo(schema);
case RECORD: return generateRecordTypeInfo(schema);
case MAP: return generateMapTypeInfo(schema);
case ARRAY: return generateArrayTypeInfo(schema);
case UNION: return generateUnionTypeInfo(schema);
case ENUM: return generateEnumTypeInfo(schema);
- case FIXED: return generateFixedTypeInfo(schema);
default: throw new AvroSerdeException("Do not yet support: " + schema);
}
}
@@ -183,22 +188,4 @@ class SchemaToTypeInfo {
return TypeInfoFactory.getPrimitiveTypeInfo("string");
}
-
- // Hive doesn't have a Fixed type, so we're going to treat them as arrays of
- // bytes
- // TODO: Make note in documentation that Hive sends these out as signed bytes.
- private static final TypeInfo FIXED_AND_BYTES_EQUIV =
- TypeInfoFactory.getListTypeInfo(TypeInfoFactory.byteTypeInfo);
- private static TypeInfo generateFixedTypeInfo(Schema schema) {
- assert schema.getType().equals(Schema.Type.FIXED);
-
- return FIXED_AND_BYTES_EQUIV;
- }
-
- // Avro considers bytes to be a primitive type, but Hive doesn't. We'll
- // convert them to a list of bytes, just like Fixed. Sigh.
- private static TypeInfo generateBytesTypeInfo(Schema schema) {
- assert schema.getType().equals(Schema.Type.BYTES);
- return FIXED_AND_BYTES_EQUIV;
- }
}
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java Mon Aug 5 22:31:28 2013
@@ -376,7 +376,7 @@ public class BinarySortableSerDe extends
case TIMESTAMP:
TimestampWritable t = (reuse == null ? new TimestampWritable() :
(TimestampWritable) reuse);
- byte[] bytes = new byte[8];
+ byte[] bytes = new byte[TimestampWritable.BINARY_SORTABLE_LENGTH];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = buffer.read(invert);
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java Mon Aug 5 22:31:28 2013
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,8 +58,17 @@ public class TimestampWritable implement
static final public byte[] nullBytes = {0x0, 0x0, 0x0, 0x0};
- private static final int NO_DECIMAL_MASK = 0x7FFFFFFF;
- private static final int HAS_DECIMAL_MASK = 0x80000000;
+ private static final int DECIMAL_OR_SECOND_VINT_FLAG = 0x80000000;
+ private static final int LOWEST_31_BITS_OF_SEC_MASK = 0x7fffffff;
+
+ private static final long SEVEN_BYTE_LONG_SIGN_FLIP = 0xff80L << 48;
+
+ private static final BigDecimal BILLION_BIG_DECIMAL = BigDecimal.valueOf(1000000000);
+
+ /** The maximum number of bytes required for a TimestampWritable */
+ public static final int MAX_BYTES = 13;
+
+ public static final int BINARY_SORTABLE_LENGTH = 11;
private static final ThreadLocal<DateFormat> threadLocalDateFormat =
new ThreadLocal<DateFormat>() {
@@ -82,16 +90,12 @@ public class TimestampWritable implement
/* Allow use of external byte[] for efficiency */
private byte[] currentBytes;
- private final byte[] internalBytes = new byte[9];
+ private final byte[] internalBytes = new byte[MAX_BYTES];
private byte[] externalBytes;
private int offset;
- /* Reused to read VInts */
- static private final VInt vInt = new VInt();
-
/* Constructors */
public TimestampWritable() {
- Arrays.fill(internalBytes, (byte) 0x0);
bytesEmpty = false;
currentBytes = internalBytes;
offset = 0;
@@ -156,11 +160,14 @@ public class TimestampWritable implement
*
* @return seconds corresponding to this TimestampWritable
*/
- public int getSeconds() {
- if (bytesEmpty) {
- return (int) (timestamp.getTime() / 1000);
+ public long getSeconds() {
+ if (!timestampEmpty) {
+ return millisToSeconds(timestamp.getTime());
+ } else if (!bytesEmpty) {
+ return TimestampWritable.getSeconds(currentBytes, offset);
+ } else {
+ throw new IllegalStateException("Both timestamp and bytes are empty");
}
- return TimestampWritable.getSeconds(currentBytes, offset);
}
/**
@@ -170,26 +177,33 @@ public class TimestampWritable implement
public int getNanos() {
if (!timestampEmpty) {
return timestamp.getNanos();
+ } else if (!bytesEmpty) {
+ return hasDecimalOrSecondVInt() ?
+ TimestampWritable.getNanos(currentBytes, offset + 4) : 0;
+ } else {
+ throw new IllegalStateException("Both timestamp and bytes are empty");
}
-
- return hasDecimal() ? TimestampWritable.getNanos(currentBytes, offset+4) : 0;
}
/**
- *
- * @return length of serialized TimestampWritable data
+ * @return length of serialized TimestampWritable data. As a side effect, populates the internal
+ * byte array if empty.
*/
- private int getTotalLength() {
- return 4 + getDecimalLength();
+ int getTotalLength() {
+ checkBytes();
+ return getTotalLength(currentBytes, offset);
}
- /**
- *
- * @return number of bytes the variable length decimal takes up
- */
- private int getDecimalLength() {
- checkBytes();
- return hasDecimal() ? WritableUtils.decodeVIntSize(currentBytes[offset+4]) : 0;
+ public static int getTotalLength(byte[] bytes, int offset) {
+ int len = 4;
+ if (hasDecimalOrSecondVInt(bytes[offset])) {
+ int firstVIntLen = WritableUtils.decodeVIntSize(bytes[offset + 4]);
+ len += firstVIntLen;
+ if (hasSecondVInt(bytes[offset + 4])) {
+ len += WritableUtils.decodeVIntSize(bytes[offset + 4 + firstVIntLen]);
+ }
+ }
+ return len;
}
public Timestamp getTimestamp() {
@@ -215,33 +229,45 @@ public class TimestampWritable implement
/**
* @return byte[] representation of TimestampWritable that is binary
- * sortable (4 byte seconds, 4 bytes for nanoseconds)
+ * sortable (7 bytes for seconds, 4 bytes for nanoseconds)
*/
public byte[] getBinarySortable() {
- byte[] b = new byte[8];
+ byte[] b = new byte[BINARY_SORTABLE_LENGTH];
int nanos = getNanos();
- int seconds = HAS_DECIMAL_MASK | getSeconds();
- intToBytes(seconds, b, 0);
- intToBytes(nanos, b, 4);
+ // We flip the highest-order bit of the seven-byte representation of seconds to make negative
+ // values come before positive ones.
+ long seconds = getSeconds() ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+ sevenByteLongToBytes(seconds, b, 0);
+ intToBytes(nanos, b, 7);
return b;
}
/**
* Given a byte[] that has binary sortable data, initialize the internal
* structures to hold that data
- * @param bytes
- * @param offset
+ * @param bytes the byte array that holds the binary sortable representation
+ * @param binSortOffset offset of the binary-sortable representation within the buffer.
*/
- public void setBinarySortable(byte[] bytes, int offset) {
- int seconds = bytesToInt(bytes, offset);
- int nanos = bytesToInt(bytes, offset+4);
- if (nanos == 0) {
- seconds &= NO_DECIMAL_MASK;
+ public void setBinarySortable(byte[] bytes, int binSortOffset) {
+ // Flip the sign bit (and unused bits of the high-order byte) of the seven-byte long back.
+ long seconds = readSevenByteLong(bytes, binSortOffset) ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+ int nanos = bytesToInt(bytes, binSortOffset + 7);
+ int firstInt = (int) seconds;
+ boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+ if (nanos != 0 || hasSecondVInt) {
+ firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
} else {
- seconds |= HAS_DECIMAL_MASK;
+ firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
}
- intToBytes(seconds, internalBytes, 0);
- setNanosBytes(nanos, internalBytes, 4);
+
+ intToBytes(firstInt, internalBytes, 0);
+ setNanosBytes(nanos, internalBytes, 4, hasSecondVInt);
+ if (hasSecondVInt) {
+ LazyBinaryUtils.writeVLongToByteArray(internalBytes,
+ 4 + WritableUtils.decodeVIntSize(internalBytes[4]),
+ seconds >> 31);
+ }
+
currentBytes = internalBytes;
this.offset = 0;
}
@@ -268,7 +294,7 @@ public class TimestampWritable implement
public double getDouble() {
double seconds, nanos;
if (bytesEmpty) {
- seconds = timestamp.getTime() / 1000;
+ seconds = millisToSeconds(timestamp.getTime());
nanos = timestamp.getNanos();
} else {
seconds = getSeconds();
@@ -281,10 +307,31 @@ public class TimestampWritable implement
public void readFields(DataInput in) throws IOException {
in.readFully(internalBytes, 0, 4);
- if (TimestampWritable.hasDecimal(internalBytes[0])) {
+ if (TimestampWritable.hasDecimalOrSecondVInt(internalBytes[0])) {
in.readFully(internalBytes, 4, 1);
int len = (byte) WritableUtils.decodeVIntSize(internalBytes[4]);
- in.readFully(internalBytes, 5, len-1);
+ if (len > 1) {
+ in.readFully(internalBytes, 5, len-1);
+ }
+
+ long vlong = LazyBinaryUtils.readVLongFromByteArray(internalBytes, 4);
+ if (vlong < -1000000000 || vlong > 999999999) {
+ throw new IOException(
+ "Invalid first vint value (encoded nanoseconds) of a TimestampWritable: " + vlong +
+ ", expected to be between -1000000000 and 999999999.");
+ // Note that -1000000000 is a valid value corresponding to a nanosecond timestamp
+ // of 999999999, because if the second VInt is present, we use the value
+ // (-reversedNanoseconds - 1) as the second VInt.
+ }
+ if (vlong < 0) {
+ // This indicates there is a second VInt containing the additional bits of the seconds
+ // field.
+ in.readFully(internalBytes, 4 + len, 1);
+ int secondVIntLen = (byte) WritableUtils.decodeVIntSize(internalBytes[4 + len]);
+ if (secondVIntLen > 1) {
+ in.readFully(internalBytes, 5 + len, secondVIntLen - 1);
+ }
+ }
}
currentBytes = internalBytes;
this.offset = 0;
@@ -301,8 +348,8 @@ public class TimestampWritable implement
public int compareTo(TimestampWritable t) {
checkBytes();
- int s1 = this.getSeconds();
- int s2 = t.getSeconds();
+ long s1 = this.getSeconds();
+ long s2 = t.getSeconds();
if (s1 == s2) {
int n1 = this.getNanos();
int n2 = t.getNanos();
@@ -311,7 +358,7 @@ public class TimestampWritable implement
}
return n1 - n2;
} else {
- return s1 - s2;
+ return s1 < s2 ? -1 : 1;
}
}
@@ -342,7 +389,7 @@ public class TimestampWritable implement
@Override
public int hashCode() {
long seconds = getSeconds();
- seconds <<= 32;
+ seconds <<= 30; // the nanosecond part fits in 30 bits
seconds |= getNanos();
return (int) ((seconds >>> 32) ^ seconds);
}
@@ -362,13 +409,30 @@ public class TimestampWritable implement
* @param offset
* @return the number of seconds
*/
- public static int getSeconds(byte[] bytes, int offset) {
- return NO_DECIMAL_MASK & bytesToInt(bytes, offset);
+ public static long getSeconds(byte[] bytes, int offset) {
+ int lowest31BitsOfSecondsAndFlag = bytesToInt(bytes, offset);
+ if (lowest31BitsOfSecondsAndFlag >= 0 || // the "has decimal or second VInt" flag is not set
+ !hasSecondVInt(bytes[offset + 4])) {
+ // The entire seconds field is stored in the first 4 bytes.
+ return lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK;
+ }
+
+ // We compose the seconds field from two parts. The lowest 31 bits come from the first four
+ // bytes. The higher-order bits come from the second VInt that follows the nanos field.
+ return ((long) (lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK)) |
+ (LazyBinaryUtils.readVLongFromByteArray(bytes,
+ offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4])) << 31);
}
public static int getNanos(byte[] bytes, int offset) {
+ VInt vInt = LazyBinaryUtils.threadLocalVInt.get();
LazyBinaryUtils.readVInt(bytes, offset, vInt);
int val = vInt.value;
+ if (val < 0) {
+ // This means there is a second VInt present that specifies additional bits of the timestamp.
+ // The reversed nanoseconds value is still encoded in this VInt.
+ val = -val - 1;
+ }
int len = (int) Math.floor(Math.log10(val)) + 1;
// Reverse the value
@@ -387,40 +451,33 @@ public class TimestampWritable implement
}
/**
- * Writes a Timestamp's serialized value to byte array b at
- * @param t
- * @param b
+ * Writes a Timestamp's serialized value to byte array b at the given offset
+ * @param timestamp to convert to bytes
+ * @param b destination byte array
+ * @param offset destination offset in the byte array
*/
public static void convertTimestampToBytes(Timestamp t, byte[] b,
int offset) {
- if (b.length < 9) {
- LOG.error("byte array too short");
- }
long millis = t.getTime();
int nanos = t.getNanos();
- boolean hasDecimal = nanos != 0 && setNanosBytes(nanos, b, offset+4);
- setSecondsBytes(millis, b, offset, hasDecimal);
- }
-
- /**
- * Given an integer representing seconds, write its serialized
- * value to the byte array b at offset
- * @param millis
- * @param b
- * @param offset
- * @param hasDecimal
- */
- private static void setSecondsBytes(long millis, byte[] b, int offset, boolean hasDecimal) {
- int seconds = (int) (millis / 1000);
-
- if (!hasDecimal) {
- seconds &= NO_DECIMAL_MASK;
+ long seconds = millisToSeconds(millis);
+ boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+ boolean hasDecimal = setNanosBytes(nanos, b, offset+4, hasSecondVInt);
+
+ int firstInt = (int) seconds;
+ if (hasDecimal || hasSecondVInt) {
+ firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
} else {
- seconds |= HAS_DECIMAL_MASK;
+ firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
}
+ intToBytes(firstInt, b, offset);
- intToBytes(seconds, b, offset);
+ if (hasSecondVInt) {
+ LazyBinaryUtils.writeVLongToByteArray(b,
+ offset + 4 + WritableUtils.decodeVIntSize(b[offset + 4]),
+ seconds >> 31);
+ }
}
/**
@@ -432,7 +489,7 @@ public class TimestampWritable implement
* @param offset
* @return
*/
- private static boolean setNanosBytes(int nanos, byte[] b, int offset) {
+ private static boolean setNanosBytes(int nanos, byte[] b, int offset, boolean hasSecondVInt) {
int decimal = 0;
if (nanos != 0) {
int counter = 0;
@@ -444,7 +501,11 @@ public class TimestampWritable implement
}
}
- LazyBinaryUtils.writeVLongToByteArray(b, offset, decimal);
+ if (hasSecondVInt || decimal != 0) {
+ // We use the sign of the reversed-nanoseconds field to indicate that there is a second VInt
+ // present.
+ LazyBinaryUtils.writeVLongToByteArray(b, offset, hasSecondVInt ? (-decimal - 1) : decimal);
+ }
return decimal != 0;
}
@@ -458,11 +519,14 @@ public class TimestampWritable implement
}
public static Timestamp decimalToTimestamp(HiveDecimal d) {
- BigDecimal seconds = new BigDecimal(d.longValue());
- long millis = d.bigDecimalValue().multiply(new BigDecimal(1000)).longValue();
- int nanos = d.bigDecimalValue().subtract(seconds).multiply(new BigDecimal(1000000000)).intValue();
-
- Timestamp t = new Timestamp(millis);
+ BigDecimal nanoInstant = d.bigDecimalValue().multiply(BILLION_BIG_DECIMAL);
+ int nanos = nanoInstant.remainder(BILLION_BIG_DECIMAL).intValue();
+ if (nanos < 0) {
+ nanos += 1000000000;
+ }
+ long seconds =
+ nanoInstant.subtract(new BigDecimal(nanos)).divide(BILLION_BIG_DECIMAL).longValue();
+ Timestamp t = new Timestamp(seconds * 1000);
t.setNanos(nanos);
return t;
@@ -480,6 +544,10 @@ public class TimestampWritable implement
// Convert to millis
long millis = seconds * 1000;
+ if (nanos < 0) {
+ millis -= 1000;
+ nanos += 1000000000;
+ }
Timestamp t = new Timestamp(millis);
// Set remaining fractional portion to nanos
@@ -488,10 +556,19 @@ public class TimestampWritable implement
}
public static void setTimestamp(Timestamp t, byte[] bytes, int offset) {
- boolean hasDecimal = hasDecimal(bytes[offset]);
- t.setTime(((long) TimestampWritable.getSeconds(bytes, offset)) * 1000);
- if (hasDecimal) {
- t.setNanos(TimestampWritable.getNanos(bytes, offset+4));
+ boolean hasDecimalOrSecondVInt = hasDecimalOrSecondVInt(bytes[offset]);
+ long seconds = (long) TimestampWritable.getSeconds(bytes, offset);
+ int nanos = 0;
+ if (hasDecimalOrSecondVInt) {
+ nanos = TimestampWritable.getNanos(bytes, offset + 4);
+ if (hasSecondVInt(bytes[offset + 4])) {
+ seconds += LazyBinaryUtils.readVLongFromByteArray(bytes,
+ offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4]));
+ }
+ }
+ t.setTime(seconds * 1000);
+ if (nanos != 0) {
+ t.setNanos(nanos);
}
}
@@ -501,17 +578,22 @@ public class TimestampWritable implement
return t;
}
- public boolean hasDecimal() {
- return hasDecimal(currentBytes[offset]);
+ private static boolean hasDecimalOrSecondVInt(byte b) {
+ return (b >> 7) != 0;
}
- /**
- *
- * @param b first byte in an encoded TimestampWritable
- * @return true if it has a decimal portion, false otherwise
- */
- public static boolean hasDecimal(byte b) {
- return (b >> 7) != 0;
+ private static boolean hasSecondVInt(byte b) {
+ return WritableUtils.isNegativeVInt(b);
+ }
+
+ private final boolean hasDecimalOrSecondVInt() {
+ return hasDecimalOrSecondVInt(currentBytes[offset]);
+ }
+
+ public final boolean hasDecimal() {
+ return hasDecimalOrSecondVInt() || currentBytes[offset + 4] != -1;
+ // If the first byte of the VInt is -1, the VInt itself is -1, indicating that there is a
+ // second VInt but the nanoseconds field is actually 0.
}
/**
@@ -528,6 +610,20 @@ public class TimestampWritable implement
}
/**
+ * Writes <code>value</code> into <code>dest</code> at <code>offset</code> as a seven-byte
+ * serialized long number.
+ */
+ static void sevenByteLongToBytes(long value, byte[] dest, int offset) {
+ dest[offset] = (byte) ((value >> 48) & 0xFF);
+ dest[offset+1] = (byte) ((value >> 40) & 0xFF);
+ dest[offset+2] = (byte) ((value >> 32) & 0xFF);
+ dest[offset+3] = (byte) ((value >> 24) & 0xFF);
+ dest[offset+4] = (byte) ((value >> 16) & 0xFF);
+ dest[offset+5] = (byte) ((value >> 8) & 0xFF);
+ dest[offset+6] = (byte) (value & 0xFF);
+ }
+
+ /**
*
* @param bytes
* @param offset
@@ -540,4 +636,27 @@ public class TimestampWritable implement
| ((0xFF & bytes[offset+2]) << 8)
| (0xFF & bytes[offset+3]);
}
+
+ static long readSevenByteLong(byte[] bytes, int offset) {
+ // We need to shift everything 8 bits left and then shift back to populate the sign field.
+ return (((0xFFL & bytes[offset]) << 56)
+ | ((0xFFL & bytes[offset+1]) << 48)
+ | ((0xFFL & bytes[offset+2]) << 40)
+ | ((0xFFL & bytes[offset+3]) << 32)
+ | ((0xFFL & bytes[offset+4]) << 24)
+ | ((0xFFL & bytes[offset+5]) << 16)
+ | ((0xFFL & bytes[offset+6]) << 8)) >> 8;
+ }
+
+ /**
+ * Rounds the number of milliseconds relative to the epoch down to the nearest whole number of
+ * seconds. 500 would round to 0, -500 would round to -1.
+ */
+ static long millisToSeconds(long millis) {
+ if (millis >= 0) {
+ return millis / 1000;
+ } else {
+ return (millis - 999) / 1000;
+ }
+ }
}
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java Mon Aug 5 22:31:28 2013
@@ -202,10 +202,7 @@ public final class LazyBinaryUtils {
break;
case TIMESTAMP:
recordInfo.elementOffset = 0;
- recordInfo.elementSize = 4;
- if(TimestampWritable.hasDecimal(bytes[offset])) {
- recordInfo.elementSize += (byte) WritableUtils.decodeVIntSize(bytes[offset+4]);
- }
+ recordInfo.elementSize = TimestampWritable.getTotalLength(bytes, offset);
break;
case DECIMAL:
// using vint instead of 4 bytes
@@ -285,6 +282,13 @@ public final class LazyBinaryUtils {
public byte length;
};
+ public static final ThreadLocal<VInt> threadLocalVInt = new ThreadLocal<VInt>() {
+ @Override
+ protected VInt initialValue() {
+ return new VInt();
+ }
+ };
+
/**
* Reads a zero-compressed encoded int from a byte array and returns it.
*
@@ -324,6 +328,28 @@ public final class LazyBinaryUtils {
}
/**
+ * Read a zero-compressed encoded long from a byte array.
+ *
+ * @param bytes the byte array
+ * @param offset the offset in the byte array where the VLong is stored
+ * @return the long
+ */
+ public static long readVLongFromByteArray(final byte[] bytes, int offset) {
+ byte firstByte = bytes[offset++];
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len-1; idx++) {
+ byte b = bytes[offset++];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
+ }
+
+ /**
* Write a zero-compressed encoded long to a byte array.
*
* @param bytes
Modified: hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java (original)
+++ hive/branches/tez/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java Mon Aug 5 22:31:28 2013
@@ -323,6 +323,10 @@ public class PrimitiveObjectInspectorCon
@Override
public Object convert(Object input) {
+ if (input == null) {
+ return null;
+ }
+
return outputOI.set(r, PrimitiveObjectInspectorUtils.getBinary(input,
inputOI));
}
Modified: hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java (original)
+++ hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java Mon Aug 5 22:31:28 2013
@@ -17,6 +17,18 @@
*/
package org.apache.hadoop.hive.serde2.avro;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -30,18 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
public class TestAvroDeserializer {
private final GenericData GENERIC_DATA = GenericData.get();
@@ -338,12 +338,12 @@ public class TestAvroDeserializer {
ArrayList<Object> row =
(ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
assertEquals(1, row.size());
- Object theArrayObject = row.get(0);
- assertTrue(theArrayObject instanceof List);
- List theList = (List)theArrayObject;
+ Object byteObject = row.get(0);
+ assertTrue(byteObject instanceof byte[]);
+ byte[] outBytes = (byte[]) byteObject;
// Verify the raw object that's been created
for(int i = 0; i < bytes.length; i++) {
- assertEquals(bytes[i], theList.get(i));
+ assertEquals(bytes[i], outBytes[i]);
}
// Now go the correct way, through objectinspectors
@@ -352,9 +352,9 @@ public class TestAvroDeserializer {
assertEquals(1, fieldsDataAsList.size());
StructField fieldRef = oi.getStructFieldRef("hash");
- List theList2 = (List)oi.getStructFieldData(row, fieldRef);
- for(int i = 0; i < bytes.length; i++) {
- assertEquals(bytes[i], theList2.get(i));
+ outBytes = (byte[]) oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < outBytes.length; i++) {
+ assertEquals(bytes[i], outBytes[i]);
}
}
@@ -377,8 +377,13 @@ public class TestAvroDeserializer {
ArrayList<Object> row =
(ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
assertEquals(1, row.size());
- Object theArrayObject = row.get(0);
- assertTrue(theArrayObject instanceof List);
+ Object byteObject = row.get(0);
+ assertTrue(byteObject instanceof byte[]);
+ byte[] outBytes = (byte[]) byteObject;
+ // Verify the raw object that's been created
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], outBytes[i]);
+ }
// Now go the correct way, through objectinspectors
StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
@@ -386,9 +391,9 @@ public class TestAvroDeserializer {
assertEquals(1, fieldsDataAsList.size());
StructField fieldRef = oi.getStructFieldRef("bytesField");
- List theList2 = (List)oi.getStructFieldData(row, fieldRef);
- for(int i = 0; i < bytes.length; i++) {
- assertEquals(bytes[i], theList2.get(i));
+ outBytes = (byte[]) oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < outBytes.length; i++) {
+ assertEquals(bytes[i], outBytes[i]);
}
}
@@ -489,9 +494,10 @@ public class TestAvroDeserializer {
ObjectInspector fieldObjectInspector = fieldRef.getFieldObjectInspector();
StringObjectInspector soi = (StringObjectInspector)fieldObjectInspector;
- if(expected == null)
+ if(expected == null) {
assertNull(soi.getPrimitiveJavaObject(rowElement));
- else
+ } else {
assertEquals(expected, soi.getPrimitiveJavaObject(rowElement));
+ }
}
}
Modified: hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java (original)
+++ hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java Mon Aug 5 22:31:28 2013
@@ -17,10 +17,18 @@
*/
package org.apache.hadoop.hive.serde2.avro;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -32,13 +40,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class TestAvroObjectInspectorGenerator {
private final TypeInfo STRING = TypeInfoFactory.getPrimitiveTypeInfo("string");
private final TypeInfo INT = TypeInfoFactory.getPrimitiveTypeInfo("int");
@@ -353,7 +354,7 @@ public class TestAvroObjectInspectorGene
AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
verifyMap(aoig, "aMap");
}
-
+
/**
* Check a given AvroObjectInspectorGenerator to verify that it matches our test
* schema's expected map.
@@ -476,10 +477,8 @@ public class TestAvroObjectInspectorGene
// Column types
assertEquals(1, aoig.getColumnTypes().size());
TypeInfo typeInfo = aoig.getColumnTypes().get(0);
- assertTrue(typeInfo instanceof ListTypeInfo);
- ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
- assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ assertTrue(typeInfo instanceof PrimitiveTypeInfo);
+ assertEquals(((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(), PrimitiveCategory.BINARY);
}
@Test // Avro considers bytes primitive, Hive doesn't. Make them list of tinyint.
@@ -495,10 +494,8 @@ public class TestAvroObjectInspectorGene
// Column types
assertEquals(1, aoig.getColumnTypes().size());
TypeInfo typeInfo = aoig.getColumnTypes().get(0);
- assertTrue(typeInfo instanceof ListTypeInfo);
- ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
- assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ assertTrue(typeInfo instanceof PrimitiveTypeInfo);
+ assertEquals(((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(), PrimitiveCategory.BINARY);
}
@Test // That Union[T, NULL] is converted to just T.
Modified: hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorConverters.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorConverters.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorConverters.java (original)
+++ hive/branches/tez/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorConverters.java Mon Aug 5 22:31:28 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.serde2.ob
import junit.framework.TestCase;
-
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -49,6 +48,7 @@ public class TestObjectInspectorConverte
booleanConverter.convert(Integer.valueOf(0)));
assertEquals("BooleanConverter", new BooleanWritable(true),
booleanConverter.convert(Integer.valueOf(1)));
+ assertEquals("BooleanConverter", null, booleanConverter.convert(null));
// Byte
Converter byteConverter = ObjectInspectorConverters.getConverter(
@@ -58,6 +58,7 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("ByteConverter", new ByteWritable((byte) 1), byteConverter
.convert(Integer.valueOf(1)));
+ assertEquals("ByteConverter", null, byteConverter.convert(null));
// Short
Converter shortConverter = ObjectInspectorConverters.getConverter(
@@ -67,6 +68,7 @@ public class TestObjectInspectorConverte
shortConverter.convert(Integer.valueOf(0)));
assertEquals("ShortConverter", new ShortWritable((short) 1),
shortConverter.convert(Integer.valueOf(1)));
+ assertEquals("ShortConverter", null, shortConverter.convert(null));
// Int
Converter intConverter = ObjectInspectorConverters.getConverter(
@@ -76,6 +78,7 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("IntConverter", new IntWritable(1), intConverter
.convert(Integer.valueOf(1)));
+ assertEquals("IntConverter", null, intConverter.convert(null));
// Long
Converter longConverter = ObjectInspectorConverters.getConverter(
@@ -85,6 +88,7 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("LongConverter", new LongWritable(1), longConverter
.convert(Integer.valueOf(1)));
+ assertEquals("LongConverter", null, longConverter.convert(null));
// Float
Converter floatConverter = ObjectInspectorConverters.getConverter(
@@ -94,6 +98,7 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("LongConverter", new FloatWritable(1), floatConverter
.convert(Integer.valueOf(1)));
+ assertEquals("LongConverter", null, floatConverter.convert(null));
// Double
Converter doubleConverter = ObjectInspectorConverters.getConverter(
@@ -103,6 +108,7 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("DoubleConverter", new DoubleWritable(1), doubleConverter
.convert(Integer.valueOf(1)));
+ assertEquals("DoubleConverter", null, doubleConverter.convert(null));
// Text
Converter textConverter = ObjectInspectorConverters.getConverter(
@@ -112,27 +118,36 @@ public class TestObjectInspectorConverte
.convert(Integer.valueOf(0)));
assertEquals("TextConverter", new Text("1"), textConverter
.convert(Integer.valueOf(1)));
+ assertEquals("TextConverter", null, textConverter.convert(null));
+
textConverter = ObjectInspectorConverters.getConverter(
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
PrimitiveObjectInspectorFactory.writableStringObjectInspector);
assertEquals("TextConverter", new Text("hive"), textConverter
.convert(new BytesWritable(new byte[]
{(byte)'h', (byte)'i',(byte)'v',(byte)'e'})));
+ assertEquals("TextConverter", null, textConverter.convert(null));
+
textConverter = ObjectInspectorConverters.getConverter(
PrimitiveObjectInspectorFactory.writableStringObjectInspector,
PrimitiveObjectInspectorFactory.writableStringObjectInspector);
assertEquals("TextConverter", new Text("hive"), textConverter
.convert(new Text("hive")));
+ assertEquals("TextConverter", null, textConverter.convert(null));
+
textConverter = ObjectInspectorConverters.getConverter(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,
PrimitiveObjectInspectorFactory.writableStringObjectInspector);
assertEquals("TextConverter", new Text("hive"), textConverter
.convert(new String("hive")));
+ assertEquals("TextConverter", null, textConverter.convert(null));
+
textConverter = ObjectInspectorConverters.getConverter(
PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
PrimitiveObjectInspectorFactory.writableStringObjectInspector);
assertEquals("TextConverter", new Text("100.001"), textConverter
.convert(new HiveDecimal("100.001")));
+ assertEquals("TextConverter", null, textConverter.convert(null));
// Binary
Converter baConverter = ObjectInspectorConverters.getConverter(
@@ -141,12 +156,15 @@ public class TestObjectInspectorConverte
assertEquals("BAConverter", new BytesWritable(new byte[]
{(byte)'h', (byte)'i',(byte)'v',(byte)'e'}),
baConverter.convert("hive"));
+ assertEquals("BAConverter", null, baConverter.convert(null));
+
baConverter = ObjectInspectorConverters.getConverter(
PrimitiveObjectInspectorFactory.writableStringObjectInspector,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector);
assertEquals("BAConverter", new BytesWritable(new byte[]
{(byte)'h', (byte)'i',(byte)'v',(byte)'e'}),
baConverter.convert(new Text("hive")));
+ assertEquals("BAConverter", null, baConverter.convert(null));
} catch (Throwable e) {
e.printStackTrace();
throw e;
Modified: hive/branches/tez/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/tez/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Mon Aug 5 22:31:28 2013
@@ -594,6 +594,11 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public Path createDelegationTokenFile(Configuration conf) throws IOException {
+ throw new UnsupportedOperationException("Tokens are not supported in current hadoop version");
+ }
+
+ @Override
public UserGroupInformation createRemoteUser(String userName, List<String> groupNames) {
return new UnixUserGroupInformation(userName, groupNames.toArray(new String[0]));
}
@@ -707,4 +712,11 @@ public class Hadoop20Shims implements Ha
public short getDefaultReplication(FileSystem fs, Path path) {
return fs.getDefaultReplication();
}
+
+ @Override
+ public String getTokenFileLocEnvName() {
+ throw new UnsupportedOperationException(
+ "Kerberos not supported in current hadoop version");
+ }
+
}
Modified: hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original)
+++ hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Mon Aug 5 22:31:28 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -525,6 +527,26 @@ public abstract class HadoopShimsSecure
}
@Override
+ public Path createDelegationTokenFile(Configuration conf) throws IOException {
+
+ //get delegation token for user
+ String uname = UserGroupInformation.getLoginUser().getShortUserName();
+ FileSystem fs = FileSystem.get(conf);
+ Token<?> fsToken = fs.getDelegationToken(uname);
+
+ File t = File.createTempFile("hive_hadoop_delegation_token", null);
+ Path tokenPath = new Path(t.toURI());
+
+ //write credential with token to file
+ Credentials cred = new Credentials();
+ cred.addToken(fsToken.getService(), fsToken);
+ cred.writeTokenStorageFile(tokenPath, conf);
+
+ return tokenPath;
+ }
+
+
+ @Override
public UserGroupInformation createProxyUser(String userName) throws IOException {
return UserGroupInformation.createProxyUser(
userName, UserGroupInformation.getLoginUser());
@@ -556,6 +578,11 @@ public abstract class HadoopShimsSecure
}
@Override
+ public String getTokenFileLocEnvName() {
+ return UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+ }
+
+ @Override
abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception;
@Override
Modified: hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1510788&r1=1510787&r2=1510788&view=diff
==============================================================================
--- hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/tez/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Aug 5 22:31:28 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.thrift;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
@@ -40,8 +42,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Client;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -64,8 +64,6 @@ import org.apache.thrift.transport.TTran
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
-
/**
* Functions that bridge Thrift's SASL transports to Hadoop's
* SASL callback handlers and authentication classes.
@@ -359,7 +357,9 @@ import static org.apache.hadoop.fs.Commo
throws IOException, InterruptedException {
if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
throw new AuthorizationException(
- "Delegation Token can be issued only with kerberos authentication");
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
}
//if the user asking the token is same as the 'owner' then don't do
//any proxy authorization checks. For cases like oozie, where it gets
@@ -388,7 +388,9 @@ import static org.apache.hadoop.fs.Commo
public long renewDelegationToken(String tokenStrForm) throws IOException {
if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
throw new AuthorizationException(
- "Delegation Token can be issued only with kerberos authentication");
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
}
return secretManager.renewDelegationToken(tokenStrForm);
}
@@ -430,7 +432,7 @@ import static org.apache.hadoop.fs.Commo
public String getRemoteUser() {
return remoteUser.get();
}
-
+
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
// This code is pretty much completely based on Hadoop's
// SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not