You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:09 UTC
[1/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master-15147 [created] 682a3c7b4
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
index cef765c..35be661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
@@ -21,11 +21,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hive.common.DiskRangeInfo;
+import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.orc.impl.SettableUncompressedStream;
import org.apache.orc.impl.BufferChunk;
+import com.google.common.collect.Lists;
+
/**
* Stream utility.
*/
@@ -45,9 +48,13 @@ public class StreamUtils {
return null;
}
- DiskRangeInfo diskRangeInfo = createDiskRangeInfo(streamBuffer);
- return new SettableUncompressedStream(streamName, diskRangeInfo.getDiskRanges(),
- diskRangeInfo.getTotalLength());
+ if (streamBuffer.getCacheBuffers() != null) {
+ DiskRangeInfo diskRangeInfo = createDiskRangeInfo(streamBuffer);
+ return new SettableUncompressedStream(streamName, diskRangeInfo.getDiskRanges(),
+ diskRangeInfo.getTotalLength());
+ } else {
+ return new SettableUncompressedStream(streamName, Lists.<DiskRange>newArrayList(), 0);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index 09e4a47..9a6406d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -88,8 +88,9 @@ public class PartialScanMapper extends MapReduceBase implements
}
try {
- //CombineHiveInputFormat is set in PartialScanTask.
- RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+ //CombineHiveInputFormat may be set in PartialScanTask.
+ RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper)
+ ((k instanceof CombineHiveKey) ? ((CombineHiveKey) k).getKey() : k);
// calculate rawdatasize
KeyBuffer keyBuffer = key.getKeyBuffer();
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/test/queries/clientpositive/llap_text.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/llap_text.q b/ql/src/test/queries/clientpositive/llap_text.q
new file mode 100644
index 0000000..0ff79e2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/llap_text.q
@@ -0,0 +1,62 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.fetch.task.conversion=none;
+
+SET hive.llap.io.enabled=false;
+set hive.llap.cache.allow.synthetic.fileid=true;
+
+
+DROP TABLE text_llap;
+
+CREATE TABLE text_llap(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat"
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+
+alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe';
+
+
+insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null;
+
+create table text_llap1 like text_llap;
+create table text_llap100 like text_llap;
+create table text_llap1000 like text_llap;
+
+insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1;
+
+insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100;
+
+insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000;
+
+
+SET hive.llap.io.enabled=true;
+SET hive.vectorized.execution.enabled=true;
+set hive.llap.io.encode.slice.row.count=90;
+
+select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2;
+select * from text_llap100 order by cint, cstring1, cstring2;
+select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2;
+
+
+DROP TABLE text_llap;
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/test/results/clientpositive/llap_text.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap_text.q.out b/ql/src/test/results/clientpositive/llap_text.q.out
new file mode 100644
index 0000000..5660ef1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap_text.q.out
@@ -0,0 +1,502 @@
+PREHOOK: query: DROP TABLE text_llap
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE text_llap
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE text_llap(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat"
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: CREATE TABLE text_llap(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat"
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap
+PREHOOK: query: alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+PREHOOK: type: ALTERTABLE_SERIALIZER
+PREHOOK: Input: default@text_llap
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+POSTHOOK: type: ALTERTABLE_SERIALIZER
+POSTHOOK: Input: default@text_llap
+POSTHOOK: Output: default@text_llap
+PREHOOK: query: insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap
+POSTHOOK: Lineage: text_llap.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: create table text_llap1 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap1
+POSTHOOK: query: create table text_llap1 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap1
+PREHOOK: query: create table text_llap100 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap100
+POSTHOOK: query: create table text_llap100 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap100
+PREHOOK: query: create table text_llap1000 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap1000
+POSTHOOK: query: create table text_llap1000 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap1000
+PREHOOK: query: insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap1
+POSTHOOK: query: insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap1
+POSTHOOK: Lineage: text_llap1.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap1.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap100
+POSTHOOK: query: insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap100
+POSTHOOK: Lineage: text_llap100.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap100.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap1000
+POSTHOOK: query: insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap1000
+POSTHOOK: Lineage: text_llap1000.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+NULL cvLH6Eat2yFsyy7p NULL
+NULL cvLH6Eat2yFsyy7p NULL
+-62 cvLH6Eat2yFsyy7p NULL
+-59 cvLH6Eat2yFsyy7p NULL
+-57 cvLH6Eat2yFsyy7p NULL
+-56 cvLH6Eat2yFsyy7p NULL
+-56 cvLH6Eat2yFsyy7p NULL
+-55 cvLH6Eat2yFsyy7p NULL
+-55 cvLH6Eat2yFsyy7p NULL
+-54 cvLH6Eat2yFsyy7p NULL
+-53 cvLH6Eat2yFsyy7p NULL
+-51 cvLH6Eat2yFsyy7p NULL
+-50 cvLH6Eat2yFsyy7p NULL
+-48 cvLH6Eat2yFsyy7p NULL
+-48 cvLH6Eat2yFsyy7p NULL
+-47 cvLH6Eat2yFsyy7p NULL
+-45 cvLH6Eat2yFsyy7p NULL
+-45 cvLH6Eat2yFsyy7p NULL
+-44 cvLH6Eat2yFsyy7p NULL
+-43 cvLH6Eat2yFsyy7p NULL
+-40 cvLH6Eat2yFsyy7p NULL
+-37 cvLH6Eat2yFsyy7p NULL
+-36 cvLH6Eat2yFsyy7p NULL
+-34 cvLH6Eat2yFsyy7p NULL
+-34 cvLH6Eat2yFsyy7p NULL
+-33 cvLH6Eat2yFsyy7p NULL
+-33 cvLH6Eat2yFsyy7p NULL
+-32 cvLH6Eat2yFsyy7p NULL
+-30 cvLH6Eat2yFsyy7p NULL
+-28 cvLH6Eat2yFsyy7p NULL
+-28 cvLH6Eat2yFsyy7p NULL
+-24 cvLH6Eat2yFsyy7p NULL
+-23 cvLH6Eat2yFsyy7p NULL
+-23 cvLH6Eat2yFsyy7p NULL
+-23 cvLH6Eat2yFsyy7p NULL
+-22 cvLH6Eat2yFsyy7p NULL
+-22 cvLH6Eat2yFsyy7p NULL
+-22 cvLH6Eat2yFsyy7p NULL
+-21 cvLH6Eat2yFsyy7p NULL
+-21 cvLH6Eat2yFsyy7p NULL
+-19 cvLH6Eat2yFsyy7p NULL
+-16 cvLH6Eat2yFsyy7p NULL
+-16 cvLH6Eat2yFsyy7p NULL
+-13 cvLH6Eat2yFsyy7p NULL
+-12 cvLH6Eat2yFsyy7p NULL
+-11 cvLH6Eat2yFsyy7p NULL
+-11 cvLH6Eat2yFsyy7p NULL
+-11 cvLH6Eat2yFsyy7p NULL
+-7 cvLH6Eat2yFsyy7p NULL
+-5 cvLH6Eat2yFsyy7p NULL
+-5 cvLH6Eat2yFsyy7p NULL
+-5 cvLH6Eat2yFsyy7p NULL
+-4 cvLH6Eat2yFsyy7p NULL
+-4 cvLH6Eat2yFsyy7p NULL
+-1 cvLH6Eat2yFsyy7p NULL
+0 cvLH6Eat2yFsyy7p NULL
+0 cvLH6Eat2yFsyy7p NULL
+2 cvLH6Eat2yFsyy7p NULL
+4 cvLH6Eat2yFsyy7p NULL
+5 cvLH6Eat2yFsyy7p NULL
+8 cvLH6Eat2yFsyy7p NULL
+9 cvLH6Eat2yFsyy7p NULL
+10 cvLH6Eat2yFsyy7p NULL
+13 cvLH6Eat2yFsyy7p NULL
+16 cvLH6Eat2yFsyy7p NULL
+18 cvLH6Eat2yFsyy7p NULL
+19 cvLH6Eat2yFsyy7p NULL
+21 cvLH6Eat2yFsyy7p NULL
+24 cvLH6Eat2yFsyy7p NULL
+24 cvLH6Eat2yFsyy7p NULL
+26 cvLH6Eat2yFsyy7p NULL
+27 cvLH6Eat2yFsyy7p NULL
+27 cvLH6Eat2yFsyy7p NULL
+28 cvLH6Eat2yFsyy7p NULL
+29 cvLH6Eat2yFsyy7p NULL
+29 cvLH6Eat2yFsyy7p NULL
+30 cvLH6Eat2yFsyy7p NULL
+31 cvLH6Eat2yFsyy7p NULL
+31 cvLH6Eat2yFsyy7p NULL
+34 cvLH6Eat2yFsyy7p NULL
+34 cvLH6Eat2yFsyy7p NULL
+36 cvLH6Eat2yFsyy7p NULL
+36 cvLH6Eat2yFsyy7p NULL
+38 cvLH6Eat2yFsyy7p NULL
+38 cvLH6Eat2yFsyy7p NULL
+38 cvLH6Eat2yFsyy7p NULL
+39 cvLH6Eat2yFsyy7p NULL
+40 cvLH6Eat2yFsyy7p NULL
+40 cvLH6Eat2yFsyy7p NULL
+41 cvLH6Eat2yFsyy7p NULL
+43 cvLH6Eat2yFsyy7p NULL
+46 cvLH6Eat2yFsyy7p NULL
+51 cvLH6Eat2yFsyy7p NULL
+51 cvLH6Eat2yFsyy7p NULL
+53 cvLH6Eat2yFsyy7p NULL
+53 cvLH6Eat2yFsyy7p NULL
+61 cvLH6Eat2yFsyy7p NULL
+61 cvLH6Eat2yFsyy7p NULL
+61 cvLH6Eat2yFsyy7p NULL
+62 cvLH6Eat2yFsyy7p NULL
+PREHOOK: query: select * from text_llap100 order by cint, cstring1, cstring2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select * from text_llap100 order by cint, cstring1, cstring2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+-50 -13326 528534767 NULL -50.0 -13326.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:46.674 1969-12-31 16:00:08.875 true NULL
+NULL -4213 528534767 NULL NULL -4213.0 cvLH6Eat2yFsyy7p NULL NULL 1969-12-31 16:00:13.589 true NULL
+-28 -15813 528534767 NULL -28.0 -15813.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.787 1969-12-31 16:00:01.546 true NULL
+31 -9566 528534767 NULL 31.0 -9566.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.187 1969-12-31 16:00:06.961 true NULL
+-34 15007 528534767 NULL -34.0 15007.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:50.434 1969-12-31 16:00:13.352 true NULL
+29 7021 528534767 NULL 29.0 7021.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:15.007 1969-12-31 16:00:15.148 true NULL
+31 4963 528534767 NULL 31.0 4963.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:07.021 1969-12-31 16:00:02.997 true NULL
+27 -7824 528534767 NULL 27.0 -7824.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:04.963 1969-12-31 15:59:56.474 true NULL
+-11 -15431 528534767 NULL -11.0 -15431.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.176 1969-12-31 16:00:07.787 true NULL
+61 -15549 528534767 NULL 61.0 -15549.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.569 1969-12-31 15:59:51.665 true NULL
+16 5780 528534767 NULL 16.0 5780.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.451 1969-12-31 16:00:12.752 true NULL
+5 14625 528534767 NULL 5.0 14625.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:05.78 1969-12-31 16:00:15.34 true NULL
+-23 13026 528534767 NULL -23.0 13026.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:14.625 1969-12-31 16:00:10.77 true NULL
+-51 -12083 528534767 NULL -51.0 -12083.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:13.026 1969-12-31 16:00:02.52 true NULL
+-11 9472 528534767 NULL -11.0 9472.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:47.917 1969-12-31 16:00:03.716 true NULL
+-48 -7735 528534767 NULL -48.0 -7735.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:09.472 1969-12-31 16:00:00.8 true NULL
+-62 10 528534767 NULL -62.0 10.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.265 1969-12-31 15:59:56.584 true NULL
+-45 5521 528534767 NULL -45.0 5521.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.01 1969-12-31 15:59:48.553 true NULL
+40 -1724 528534767 NULL 40.0 -1724.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:05.521 1969-12-31 15:59:57.835 true NULL
+39 -10909 528534767 NULL 39.0 -10909.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:58.276 1969-12-31 16:00:12.738 true NULL
+-32 11242 528534767 NULL -32.0 11242.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:49.091 1969-12-31 15:59:55.681 true NULL
+-56 8353 528534767 NULL -56.0 8353.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:11.242 1969-12-31 15:59:46.526 true NULL
+-7 2541 528534767 NULL -7.0 2541.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:08.353 1969-12-31 15:59:57.374 true NULL
+24 4432 528534767 NULL 24.0 4432.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:02.541 1969-12-31 16:00:10.895 true NULL
+36 -15912 528534767 NULL 36.0 -15912.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:04.432 1969-12-31 16:00:04.376 true NULL
+-23 -10154 528534767 NULL -23.0 -10154.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.088 1969-12-31 15:59:56.086 true NULL
+-55 -7449 528534767 NULL -55.0 -7449.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:49.846 1969-12-31 15:59:55.75 true NULL
+-11 7476 528534767 NULL -11.0 7476.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.551 1969-12-31 15:59:57.567 true NULL
+51 -4490 528534767 NULL 51.0 -4490.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:07.476 1969-12-31 15:59:49.318 true NULL
+-24 163 528534767 NULL -24.0 163.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.51 1969-12-31 16:00:04.014 true NULL
+-44 -1299 528534767 NULL -44.0 -1299.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.163 1969-12-31 15:59:47.687 true NULL
+8 7860 528534767 NULL 8.0 7860.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:58.701 1969-12-31 16:00:01.97 true NULL
+24 -4812 528534767 NULL 24.0 -4812.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:07.86 1969-12-31 15:59:55 true NULL
+4 -14739 528534767 NULL 4.0 -14739.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.188 1969-12-31 16:00:15.26 true NULL
+-57 -11492 528534767 NULL -57.0 -11492.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:45.261 1969-12-31 16:00:05.306 true NULL
+-22 3856 528534767 NULL -22.0 3856.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:48.508 1969-12-31 15:59:54.534 true NULL
+28 8035 528534767 NULL 28.0 8035.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:03.856 1969-12-31 15:59:55.95 true NULL
+-16 -7964 528534767 NULL -16.0 -7964.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:08.035 1969-12-31 16:00:12.464 true NULL
+46 6958 528534767 NULL 46.0 6958.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.036 1969-12-31 16:00:10.191 true NULL
+29 -1990 528534767 NULL 29.0 -1990.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:06.958 1969-12-31 15:59:52.902 true NULL
+-56 8402 528534767 NULL -56.0 8402.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:58.01 1969-12-31 16:00:05.146 true NULL
+-16 -6922 528534767 NULL -16.0 -6922.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:08.402 1969-12-31 15:59:50.561 true NULL
+38 -6583 528534767 NULL 38.0 -6583.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:53.078 1969-12-31 16:00:06.722 true NULL
+-54 -10268 528534767 NULL -54.0 -10268.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:53.417 1969-12-31 16:00:00.687 true NULL
+-23 4587 528534767 NULL -23.0 4587.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:49.732 1969-12-31 15:59:48.52 true NULL
+-19 1206 528534767 NULL -19.0 1206.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:04.587 1969-12-31 16:00:08.381 true NULL
+40 -7984 528534767 NULL 40.0 -7984.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:01.206 1969-12-31 16:00:02.59 true NULL
+62 6557 528534767 NULL 62.0 6557.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.016 1969-12-31 16:00:00.367 true NULL
+-34 4181 528534767 NULL -34.0 4181.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:06.557 1969-12-31 16:00:04.869 true NULL
+53 -10129 528534767 NULL 53.0 -10129.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:04.181 1969-12-31 16:00:08.061 true NULL
+51 -15790 528534767 NULL 51.0 -15790.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:49.871 1969-12-31 15:59:57.821 true NULL
+-4 2617 528534767 NULL -4.0 2617.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.21 1969-12-31 15:59:44.733 true NULL
+61 12161 528534767 NULL 61.0 12161.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:02.617 1969-12-31 16:00:10.536 true NULL
+19 7952 528534767 NULL 19.0 7952.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:12.161 1969-12-31 16:00:00.95 true NULL
+-33 7350 528534767 NULL -33.0 7350.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:07.952 1969-12-31 15:59:48.183 true NULL
+53 -12171 528534767 NULL 53.0 -12171.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:07.35 1969-12-31 15:59:57.549 true NULL
+18 -3045 528534767 NULL 18.0 -3045.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:47.829 1969-12-31 16:00:05.045 true NULL
+30 -814 528534767 NULL 30.0 -814.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:56.955 1969-12-31 16:00:11.799 true NULL
+-36 1639 528534767 NULL -36.0 1639.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.186 1969-12-31 16:00:13.098 true NULL
+34 -15059 528534767 NULL 34.0 -15059.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:01.639 1969-12-31 16:00:13.206 true NULL
+-55 -7353 528534767 NULL -55.0 -7353.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:44.941 1969-12-31 15:59:54.268 true NULL
+-40 -4463 528534767 NULL -40.0 -4463.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.647 1969-12-31 15:59:46.254 true NULL
+21 11737 528534767 NULL 21.0 11737.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.537 1969-12-31 15:59:45.022 true NULL
+61 -1254 528534767 NULL 61.0 -1254.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:11.737 1969-12-31 16:00:12.004 true NULL
+-59 10688 528534767 NULL -59.0 10688.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:58.746 1969-12-31 16:00:15.489 true NULL
+0 -3166 528534767 NULL 0.0 -3166.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:10.688 1969-12-31 16:00:01.385 true NULL
+-21 3168 528534767 NULL -21.0 3168.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:56.834 1969-12-31 16:00:13.331 true NULL
+-33 14072 528534767 NULL -33.0 14072.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:03.168 1969-12-31 15:59:55.836 true NULL
+-30 834 528534767 NULL -30.0 834.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:14.072 1969-12-31 16:00:03.004 true NULL
+-5 -13229 528534767 NULL -5.0 -13229.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.834 1969-12-31 16:00:00.388 true NULL
+-53 -3419 528534767 NULL -53.0 -3419.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:46.771 1969-12-31 15:59:53.744 true NULL
+34 -4255 528534767 NULL 34.0 -4255.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:56.581 1969-12-31 15:59:57.88 true NULL
+-5 12422 528534767 NULL -5.0 12422.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.745 1969-12-31 15:59:48.802 true NULL
+27 -14965 528534767 NULL 27.0 -14965.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:12.422 1969-12-31 16:00:09.517 true NULL
+NULL -3012 528534767 NULL NULL -3012.0 cvLH6Eat2yFsyy7p NULL NULL 1969-12-31 16:00:03.756 true NULL
+-21 -7183 528534767 NULL -21.0 -7183.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:45.035 1969-12-31 16:00:06.182 true NULL
+43 1475 528534767 NULL 43.0 1475.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:56.988 1969-12-31 16:00:03.442 true NULL
+41 37 528534767 NULL 41.0 37.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:52.817 1969-12-31 15:59:53.672 true NULL
+-28 6453 528534767 NULL -28.0 6453.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:01.475 1969-12-31 16:00:07.828 true NULL
+-5 -14379 528534767 NULL -5.0 -14379.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.037 1969-12-31 15:59:49.141 true NULL
+13 1358 528534767 NULL 13.0 1358.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:06.453 1969-12-31 16:00:00.423 true NULL
+-45 -14072 528534767 NULL -45.0 -14072.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:45.621 1969-12-31 15:59:45.914 true NULL
+10 9366 528534767 NULL 10.0 9366.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:01.358 1969-12-31 15:59:50.592 true NULL
+-22 77 528534767 NULL -22.0 77.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:45.928 1969-12-31 15:59:43.621 true NULL
+38 -4667 528534767 NULL 38.0 -4667.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:09.366 1969-12-31 15:59:52.334 true NULL
+-48 13300 528534767 NULL -48.0 13300.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:00.077 1969-12-31 15:59:45.827 true NULL
+2 1345 528534767 NULL 2.0 1345.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:55.333 1969-12-31 16:00:00.517 true NULL
+-37 -12472 528534767 NULL -37.0 -12472.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:13.3 1969-12-31 15:59:55.998 true NULL
+-43 486 528534767 NULL -43.0 486.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:01.345 1969-12-31 15:59:52.667 true NULL
+36 14907 528534767 NULL 36.0 14907.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:47.528 1969-12-31 15:59:47.206 true NULL
+-1 -75 528534767 NULL -1.389 -863.257 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:49.331 1969-12-31 16:00:07.585 true NULL
+-12 -2013 528534767 NULL -12.0 -2013.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:14.907 1969-12-31 15:59:58.789 true NULL
+0 15626 528534767 NULL 0.0 15626.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:09.566 1969-12-31 16:00:15.217 true NULL
+26 3961 528534767 NULL 26.0 3961.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:57.987 1969-12-31 15:59:52.232 true NULL
+-22 8499 528534767 NULL -22.0 8499.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:15.626 1969-12-31 16:00:10.923 true NULL
+9 9169 528534767 NULL 9.0 9169.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:03.961 1969-12-31 16:00:14.126 true NULL
+-13 -13372 528534767 NULL -13.0 -13372.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:08.499 1969-12-31 15:59:48.221 true NULL
+38 -11320 528534767 NULL 38.0 -11320.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 16:00:09.169 1969-12-31 16:00:03.822 true NULL
+-4 -1027 528534767 NULL -4.0 -1027.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:46.628 1969-12-31 16:00:11.413 true NULL
+-47 -2468 528534767 NULL -47.0 -2468.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:48.68 1969-12-31 16:00:02.94 true NULL
+PREHOOK: query: select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+-15912 cvLH6Eat2yFsyy7p NULL
+-15813 cvLH6Eat2yFsyy7p NULL
+-15790 cvLH6Eat2yFsyy7p NULL
+-15549 cvLH6Eat2yFsyy7p NULL
+-15431 cvLH6Eat2yFsyy7p NULL
+-15059 cvLH6Eat2yFsyy7p NULL
+-14965 cvLH6Eat2yFsyy7p NULL
+-14739 cvLH6Eat2yFsyy7p NULL
+-14379 cvLH6Eat2yFsyy7p NULL
+-14072 cvLH6Eat2yFsyy7p NULL
+-13372 cvLH6Eat2yFsyy7p NULL
+-13326 cvLH6Eat2yFsyy7p NULL
+-13229 cvLH6Eat2yFsyy7p NULL
+-12472 cvLH6Eat2yFsyy7p NULL
+-12171 cvLH6Eat2yFsyy7p NULL
+-12083 cvLH6Eat2yFsyy7p NULL
+-11492 cvLH6Eat2yFsyy7p NULL
+-11320 cvLH6Eat2yFsyy7p NULL
+-10909 cvLH6Eat2yFsyy7p NULL
+-10268 cvLH6Eat2yFsyy7p NULL
+-10154 cvLH6Eat2yFsyy7p NULL
+-10129 cvLH6Eat2yFsyy7p NULL
+-9566 cvLH6Eat2yFsyy7p NULL
+-7984 cvLH6Eat2yFsyy7p NULL
+-7964 cvLH6Eat2yFsyy7p NULL
+-7824 cvLH6Eat2yFsyy7p NULL
+-7735 cvLH6Eat2yFsyy7p NULL
+-7449 cvLH6Eat2yFsyy7p NULL
+-7353 cvLH6Eat2yFsyy7p NULL
+-7183 cvLH6Eat2yFsyy7p NULL
+-6922 cvLH6Eat2yFsyy7p NULL
+-6583 cvLH6Eat2yFsyy7p NULL
+-4812 cvLH6Eat2yFsyy7p NULL
+-4667 cvLH6Eat2yFsyy7p NULL
+-4490 cvLH6Eat2yFsyy7p NULL
+-4463 cvLH6Eat2yFsyy7p NULL
+-4255 cvLH6Eat2yFsyy7p NULL
+-4213 cvLH6Eat2yFsyy7p NULL
+-3419 cvLH6Eat2yFsyy7p NULL
+-3166 cvLH6Eat2yFsyy7p NULL
+-3045 cvLH6Eat2yFsyy7p NULL
+-3012 cvLH6Eat2yFsyy7p NULL
+-2468 cvLH6Eat2yFsyy7p NULL
+-2013 cvLH6Eat2yFsyy7p NULL
+-1990 cvLH6Eat2yFsyy7p NULL
+-1724 cvLH6Eat2yFsyy7p NULL
+-1299 cvLH6Eat2yFsyy7p NULL
+-1254 cvLH6Eat2yFsyy7p NULL
+-1027 cvLH6Eat2yFsyy7p NULL
+-814 cvLH6Eat2yFsyy7p NULL
+-75 cvLH6Eat2yFsyy7p NULL
+10 cvLH6Eat2yFsyy7p NULL
+37 cvLH6Eat2yFsyy7p NULL
+77 cvLH6Eat2yFsyy7p NULL
+163 cvLH6Eat2yFsyy7p NULL
+486 cvLH6Eat2yFsyy7p NULL
+834 cvLH6Eat2yFsyy7p NULL
+1206 cvLH6Eat2yFsyy7p NULL
+1345 cvLH6Eat2yFsyy7p NULL
+1358 cvLH6Eat2yFsyy7p NULL
+1475 cvLH6Eat2yFsyy7p NULL
+1639 cvLH6Eat2yFsyy7p NULL
+2541 cvLH6Eat2yFsyy7p NULL
+2617 cvLH6Eat2yFsyy7p NULL
+3168 cvLH6Eat2yFsyy7p NULL
+3856 cvLH6Eat2yFsyy7p NULL
+3961 cvLH6Eat2yFsyy7p NULL
+4181 cvLH6Eat2yFsyy7p NULL
+4432 cvLH6Eat2yFsyy7p NULL
+4587 cvLH6Eat2yFsyy7p NULL
+4963 cvLH6Eat2yFsyy7p NULL
+5521 cvLH6Eat2yFsyy7p NULL
+5780 cvLH6Eat2yFsyy7p NULL
+6453 cvLH6Eat2yFsyy7p NULL
+6557 cvLH6Eat2yFsyy7p NULL
+6958 cvLH6Eat2yFsyy7p NULL
+7021 cvLH6Eat2yFsyy7p NULL
+7350 cvLH6Eat2yFsyy7p NULL
+7476 cvLH6Eat2yFsyy7p NULL
+7860 cvLH6Eat2yFsyy7p NULL
+7952 cvLH6Eat2yFsyy7p NULL
+8035 cvLH6Eat2yFsyy7p NULL
+8353 cvLH6Eat2yFsyy7p NULL
+8402 cvLH6Eat2yFsyy7p NULL
+8499 cvLH6Eat2yFsyy7p NULL
+9169 cvLH6Eat2yFsyy7p NULL
+9366 cvLH6Eat2yFsyy7p NULL
+9472 cvLH6Eat2yFsyy7p NULL
+10688 cvLH6Eat2yFsyy7p NULL
+11242 cvLH6Eat2yFsyy7p NULL
+11737 cvLH6Eat2yFsyy7p NULL
+12161 cvLH6Eat2yFsyy7p NULL
+12422 cvLH6Eat2yFsyy7p NULL
+13026 cvLH6Eat2yFsyy7p NULL
+13300 cvLH6Eat2yFsyy7p NULL
+14072 cvLH6Eat2yFsyy7p NULL
+14625 cvLH6Eat2yFsyy7p NULL
+14907 cvLH6Eat2yFsyy7p NULL
+15007 cvLH6Eat2yFsyy7p NULL
+15626 cvLH6Eat2yFsyy7p NULL
+PREHOOK: query: DROP TABLE text_llap
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@text_llap
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: DROP TABLE text_llap
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@text_llap
+POSTHOOK: Output: default@text_llap
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
index b84aeb5..a9263ec 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
@@ -207,4 +207,10 @@ public class DiskRangeList extends DiskRange {
return result;
}
}
+
+ public void setEnd(long newEnd) {
+ assert newEnd >= this.offset;
+ assert this.next == null || this.next.offset >= newEnd;
+ this.end = newEnd;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
index 13772c9..b894c11e 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
@@ -22,6 +22,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A block of data for a given section of a file, similar to VRB but in encoded form.
* Stores a set of buffers for each encoded stream that is a part of each column.
@@ -72,6 +75,19 @@ public class EncodedColumnBatch<BatchKey> {
public void setIndexBaseOffset(int indexBaseOffset) {
this.indexBaseOffset = indexBaseOffset;
}
+
+ @Override
+ public String toString() {
+ String bufStr = "";
+ if (cacheBuffers != null) {
+ for (MemoryBuffer mb : cacheBuffers) {
+ bufStr += mb.getClass().getSimpleName() + " with " + mb.getByteBufferRaw().remaining() + " bytes, ";
+ }
+ }
+ return "ColumnStreamData [cacheBuffers=[" + bufStr
+ + "], indexBaseOffset=" + indexBaseOffset + "]";
+ }
+
}
/** The key that is used to map this batch to source location. */
@@ -104,6 +120,7 @@ public class EncodedColumnBatch<BatchKey> {
}
}
+ private static final Logger LOG = LoggerFactory.getLogger(EncodedColumnBatch.class);
public void setStreamData(int colIx, int streamIx, ColumnStreamData csd) {
assert hasData[colIx];
columnData[colIx][streamIx] = csd;
[3/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
new file mode 100644
index 0000000..e84f5cc
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+class LlapRecordReader
+ implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
+
+ private final FileSplit split;
+ private final List<Integer> columnIds;
+ private final SearchArgument sarg;
+ private final String[] columnNames;
+ private final VectorizedRowBatchCtx rbCtx;
+ private final Object[] partitionValues;
+
+ private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+ private ColumnVectorBatch lastCvb = null;
+ private boolean isFirst = true;
+
+ private Throwable pendingError = null;
+ /** Vector that is currently being processed by our user. */
+ private boolean isDone = false;
+ private final boolean isClosed = false;
+ private final ConsumerFeedback<ColumnVectorBatch> feedback;
+ private final QueryFragmentCounters counters;
+ private long firstReturnTime;
+
+ private final JobConf jobConf;
+ private final boolean[] includedColumns;
+ private final ReadPipeline rp;
+ private final ExecutorService executor;
+
+ private TypeDescription fileSchema;
+
+ public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
+ String hostName, ColumnVectorProducer cvp, ExecutorService executor,
+ InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter)
+ throws IOException, HiveException {
+ this.executor = executor;
+ this.jobConf = job;
+ this.split = split;
+ this.columnIds = includedCols;
+ this.sarg = ConvertAstToSearchArg.createFromConf(job);
+ this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
+ final String fragmentId = LlapTezUtils.getFragmentId(job);
+ final String dagId = LlapTezUtils.getDagId(job);
+ final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
+ MDC.put("dagId", dagId);
+ MDC.put("queryId", queryId);
+ TezCounters taskCounters = null;
+ if (fragmentId != null) {
+ MDC.put("fragmentId", fragmentId);
+ taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
+ LOG.info("Received fragment id: {}", fragmentId);
+ } else {
+ LOG.warn("Not using tez counters as fragment id string is null");
+ }
+ this.counters = new QueryFragmentCounters(job, taskCounters);
+ this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
+
+ MapWork mapWork = Utilities.getMapWork(job);
+ VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+ rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
+
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
+ } else {
+ partitionValues = null;
+ }
+
+ boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+ TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
+ job, isAcidScan, Integer.MAX_VALUE);
+
+ // Create the consumer of encoded data; it will coordinate decoding to CVBs.
+ feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames,
+ counters, schema, sourceInputFormat, sourceSerDe, reporter, job);
+ fileSchema = rp.getFileSchema();
+ includedColumns = rp.getIncludedColumns();
+ }
+
+ /**
+ * Starts the data read pipeline
+ */
+ public boolean init() {
+ if (!checkOrcSchemaEvolution()) return false;
+
+ // perform the data read asynchronously
+ if (executor instanceof StatsRecordingThreadPool) {
+ // Every thread created by this thread pool will use the same handler
+ ((StatsRecordingThreadPool) executor).setUncaughtExceptionHandler(
+ new IOUncaughtExceptionHandler());
+ }
+ executor.submit(rp.getReadCallable());
+ return true;
+ }
+
+ private boolean checkOrcSchemaEvolution() {
+ SchemaEvolution schemaEvolution = new SchemaEvolution(
+ fileSchema, rp.getReaderSchema(), includedColumns);
+ for (Integer colId : columnIds) {
+ if (!schemaEvolution.isPPDSafeConversion(colId)) {
+ LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+ assert value != null;
+ if (isClosed) {
+ throw new AssertionError("next called after close");
+ }
+ // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+ boolean wasFirst = isFirst;
+ if (isFirst) {
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
+ }
+ isFirst = false;
+ }
+ ColumnVectorBatch cvb = null;
+ try {
+ cvb = nextCvb();
+ } catch (InterruptedException e) {
+ // Query might have been canceled. Stop the background processing.
+ feedback.stop();
+ throw new IOException(e);
+ }
+ if (cvb == null) {
+ if (wasFirst) {
+ firstReturnTime = counters.startTimeCounter();
+ }
+ counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
+ return false;
+ }
+ if (columnIds.size() != cvb.cols.length) {
+ throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
+ + " included, but the reader returned " + cvb.cols.length);
+ }
+ // VRB was created from VrbCtx, so we already have pre-allocated column vectors
+ for (int i = 0; i < cvb.cols.length; ++i) {
+ // Return old CVs (if any) to caller. We assume these things all have the same schema.
+ cvb.swapColumnVector(i, value.cols, columnIds.get(i));
+ }
+ value.selectedInUse = false;
+ value.size = cvb.size;
+ if (wasFirst) {
+ firstReturnTime = counters.startTimeCounter();
+ }
+ return true;
+ }
+
+ public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
+ return rbCtx;
+ }
+
+ private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
+ " Message: {}", t.getName(), t.getId(), e.getMessage());
+ setError(e);
+ }
+ }
+
+ ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
+ boolean isFirst = (lastCvb == null);
+ if (!isFirst) {
+ feedback.returnData(lastCvb);
+ }
+ synchronized (pendingData) {
+ // We are waiting for next block. Either we will get it, or be told we are done.
+ boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
+ if (doLogBlocking) {
+ LlapIoImpl.LOG.trace("next will block");
+ }
+ while (isNothingToReport()) {
+ pendingData.wait(100);
+ }
+ if (doLogBlocking) {
+ LlapIoImpl.LOG.trace("next is unblocked");
+ }
+ rethrowErrorIfAny();
+ lastCvb = pendingData.poll();
+ }
+ if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
+ LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
+ }
+ return lastCvb;
+ }
+
+ private boolean isNothingToReport() {
+ return !isDone && pendingData.isEmpty() && pendingError == null;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return rbCtx.createVectorizedRowBatch();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
+ isClosed, isDone, pendingError, pendingData.size());
+ }
+ LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
+ feedback.stop();
+ rethrowErrorIfAny();
+ MDC.clear();
+ }
+
+ private void rethrowErrorIfAny() throws IOException {
+ if (pendingError == null) return;
+ if (pendingError instanceof IOException) {
+ throw (IOException)pendingError;
+ }
+ throw new IOException(pendingError);
+ }
+
+ @Override
+ public void setDone() {
+ if (LlapIoImpl.LOG.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("setDone called; closed {}, done {}, err {}, pending {}",
+ isClosed, isDone, pendingError, pendingData.size());
+ }
+ synchronized (pendingData) {
+ isDone = true;
+ pendingData.notifyAll();
+ }
+ }
+
+ @Override
+ public void consumeData(ColumnVectorBatch data) {
+ if (LlapIoImpl.LOG.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("consume called; closed {}, done {}, err {}, pending {}",
+ isClosed, isDone, pendingError, pendingData.size());
+ }
+ synchronized (pendingData) {
+ if (isClosed) {
+ return;
+ }
+ pendingData.add(data);
+ pendingData.notifyAll();
+ }
+ }
+
+ @Override
+ public void setError(Throwable t) {
+ counters.incrCounter(LlapIOCounters.NUM_ERRORS);
+ LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
+ isClosed, isDone, pendingError, pendingData.size());
+ assert t != null;
+ synchronized (pendingData) {
+ pendingError = t;
+ pendingData.notifyAll();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ // TODO: plumb progress info thru the reader if we can get metadata from loader first.
+ return 0.0f;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index db86296..2e4f2ba 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
/**
@@ -33,7 +37,8 @@ import org.apache.orc.TypeDescription;
*/
public interface ColumnVectorProducer {
ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
- QueryFragmentCounters counters,
- TypeDescription readerSchema) throws IOException;
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ QueryFragmentCounters counters, TypeDescription readerSchema,
+ InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter,
+ JobConf job) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 6b54b30..04fed44 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.TypeDescription;
public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
implements Consumer<BatchType>, ReadPipeline {
@@ -122,4 +123,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
// We are just a relay; send unpause to encoded data producer.
upstreamFeedback.unpause();
}
+
+ @Override
+ public TypeDescription getFileSchema() {
+ // TODO: the ORC-specific method should be removed from the interface instead.
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
new file mode 100644
index 0000000..d384b85
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public class GenericColumnVectorProducer implements ColumnVectorProducer {
+ private final SerDeLowLevelCacheImpl cache;
+ private final BufferUsageManager bufferManager;
+ private final Configuration conf;
+ private final LlapDaemonCacheMetrics cacheMetrics;
+ private final LlapDaemonIOMetrics ioMetrics;
+
+ public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache,
+ BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics cacheMetrics,
+ LlapDaemonIOMetrics ioMetrics) {
+ LlapIoImpl.LOG.info("Initializing ORC column vector producer");
+ this.cache = serdeCache;
+ this.bufferManager = bufferManager;
+ this.conf = conf;
+ this.cacheMetrics = cacheMetrics;
+ this.ioMetrics = ioMetrics;
+ }
+
+ @Override
+ public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ QueryFragmentCounters counters, TypeDescription schema, InputFormat<?, ?> sourceInputFormat,
+ Deserializer sourceSerDe, Reporter reporter, JobConf job) throws IOException {
+ cacheMetrics.incrCacheReadRequests();
+ OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
+ consumer, columnIds.size(), false, counters, ioMetrics);
+ TextFileMetadata fm;
+ try {
+ fm = new TextFileMetadata(sourceSerDe);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ edc.setFileMetadata(fm);
+ // Note that we pass job config to the record reader, but use global config for LLAP IO.
+ SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache,
+ bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat,
+ sourceSerDe, counters, fm.getSchema());
+ edc.init(reader, reader);
+ LlapIoImpl.LOG.info("Ignoring schema: " + schema);
+ return edc;
+ }
+
+
+ public static final class TextStripeMetadata implements ConsumerStripeMetadata {
+ // The writer is local to the process.
+ private final String writerTimezone = TimeZone.getDefault().getID();
+ private List<ColumnEncoding> encodings;
+ private final int stripeIx;
+ private long rowCount = -1;
+
+ public TextStripeMetadata(int stripeIx) {
+ this.stripeIx = stripeIx;
+ }
+
+ @Override
+ public String getWriterTimezone() {
+ return writerTimezone;
+ }
+
+ @Override
+ public int getStripeIx() {
+ return stripeIx;
+ }
+
+ @Override
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public List<ColumnEncoding> getEncodings() {
+ return encodings;
+ }
+
+ @Override
+ public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setEncodings(List<ColumnEncoding> encodings) {
+ this.encodings = encodings;
+ }
+
+ @Override
+ public RowIndex[] getRowIndexes() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean supportsRowIndexes() {
+ return false;
+ }
+
+ public void setRowCount(long value) {
+ rowCount = value;
+ }
+
+ @Override
+ public String toString() {
+ return "[stripeIx=" + stripeIx + ", rowCount=" + rowCount + ", encodings=" + encodings + "]".replace('\n', ' ');
+ }
+ }
+
+
+ private static final class TextFileMetadata implements ConsumerFileMetadata {
+ private final List<Type> orcTypes = new ArrayList<>();
+ private final TypeDescription schema;
+ public TextFileMetadata(Deserializer sourceSerDe) throws SerDeException {
+ TypeDescription schema = OrcInputFormat.convertTypeInfo(
+ TypeInfoUtils.getTypeInfoFromObjectInspector(sourceSerDe.getObjectInspector()));
+ this.schema = schema;
+ addTypesFromSchema(schema);
+ }
+
+ private void addTypesFromSchema(TypeDescription schema) {
+ // The same thing that WriterImpl does when writing the footer, but w/o the footer.
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
+ orcTypes.add(type.build());
+ if (children == null) return;
+ for(TypeDescription child : children) {
+ addTypesFromSchema(child);
+ }
+ }
+
+ @Override
+ public List<Type> getTypes() {
+ return orcTypes;
+ }
+
+ @Override
+ public int getStripeCount() {
+ return 1;
+ }
+
+ @Override
+ public CompressionKind getCompressionKind() {
+ return CompressionKind.NONE;
+ }
+
+ @Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
new file mode 100644
index 0000000..7fecdaa
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
+
+// TODO# VectorizedInputFormatInterface is a hack... only "vectorized" in LLAP IO.
+// How to resolve optimizer dependency?
+public class LlapTextInputFormat extends org.apache.hadoop.mapred.TextInputFormat
+ implements LlapWrappableInputFormatInterface, VectorizedInputFormatInterface {
+
+ @Override
+ public boolean isSerdeBased() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 2e9b9c3..565e3d2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -34,7 +34,11 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
public class OrcColumnVectorProducer implements ColumnVectorProducer {
@@ -63,12 +67,14 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
@Override
public ReadPipeline createReadPipeline(
- Consumer<ColumnVectorBatch> consumer, FileSplit split,
- List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
- QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+ Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds,
+ SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters,
+ TypeDescription readerSchema, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
+ Reporter reporter, JobConf job) throws IOException {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
_skipCorrupt, counters, ioMetrics);
+ // Note: we use global conf here and ignore JobConf.
OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
edc.init(reader, reader);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 29f1ba8..eb40d1f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -16,16 +16,16 @@
package org.apache.hadoop.hive.llap.io.decode;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionCodec;
import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory.SettableTreeReader;
@@ -53,11 +52,12 @@ import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.TreeReaderFactory;
import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
import org.apache.orc.OrcProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
public class OrcEncodedDataConsumer
extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> {
@@ -65,9 +65,9 @@ public class OrcEncodedDataConsumer
private TreeReaderFactory.TreeReader[] columnReaders;
private int[] columnMapping; // Mapping from columnReaders (by index) to columns in file schema.
private int previousStripeIndex = -1;
- private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
+ private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file.
private CompressionCodec codec;
- private OrcStripeMetadata[] stripes;
+ private List<ConsumerStripeMetadata> stripes;
private final boolean skipCorrupt; // TODO: get rid of this
private final QueryFragmentCounters counters;
private boolean[] includedColumns;
@@ -82,16 +82,21 @@ public class OrcEncodedDataConsumer
this.counters = counters;
}
- public void setFileMetadata(OrcFileMetadata f) {
+ public void setFileMetadata(ConsumerFileMetadata f) {
assert fileMetadata == null;
fileMetadata = f;
- stripes = new OrcStripeMetadata[f.getStripes().size()];
- codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind());
+ stripes = new ArrayList<>(f.getStripeCount());
+ codec = PhysicalFsWriter.createCodec(f.getCompressionKind());
}
- public void setStripeMetadata(OrcStripeMetadata m) {
+ public void setStripeMetadata(ConsumerStripeMetadata m) {
assert stripes != null;
- stripes[m.getStripeIx()] = m;
+ int newIx = m.getStripeIx();
+ for (int i = stripes.size(); i <= newIx; ++i) {
+ stripes.add(null);
+ }
+ assert stripes.get(newIx) == null;
+ stripes.set(newIx, m);
}
@Override
@@ -103,17 +108,18 @@ public class OrcEncodedDataConsumer
boolean sameStripe = currentStripeIndex == previousStripeIndex;
try {
- OrcStripeMetadata stripeMetadata = stripes[currentStripeIndex];
+ ConsumerStripeMetadata stripeMetadata = stripes.get(currentStripeIndex);
// Get non null row count from root column, to get max vector batches
int rgIdx = batch.getBatchKey().rgIx;
long nonNullRowCount = -1;
if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) {
nonNullRowCount = stripeMetadata.getRowCount();
} else {
- OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
+ OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexEntry(0, rgIdx);
nonNullRowCount = getRowCount(rowIndex);
}
int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
+ // LOG.info("TODO# expecting " + nonNullRowCount + " rows with at most " + maxBatchesRG);
int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
TypeDescription schema = fileMetadata.getSchema();
@@ -136,6 +142,7 @@ public class OrcEncodedDataConsumer
batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
if (batchSize == 0) break;
}
+ // LOG.info("TODO# batch " + i + " of " + batchSize);
ColumnVectorBatch cvb = cvbPool.take();
// assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
@@ -148,6 +155,7 @@ public class OrcEncodedDataConsumer
cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), batchSize);
}
cvb.cols[idx].ensureSize(batchSize, false);
+ LOG.info("TODO# nextVector on " + idx + "; "+ reader + " for " + columnMapping[idx]);
reader.nextVector(cvb.cols[idx], null, batchSize);
}
@@ -155,6 +163,7 @@ public class OrcEncodedDataConsumer
downstreamConsumer.consumeData(cvb);
counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
}
+ LOG.info("TODO# done with decode");
counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
@@ -214,8 +223,9 @@ public class OrcEncodedDataConsumer
}
private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
- EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
+ EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+ // LOG.info("TODO# positionInStreams pps " + Lists.newArrayList(pps));
if (pps == null) return;
for (int i = 0; i < columnReaders.length; i++) {
columnReaders[i].seek(pps);
@@ -224,8 +234,9 @@ public class OrcEncodedDataConsumer
private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
- OrcStripeMetadata stripeMetadata) throws IOException {
+ ConsumerStripeMetadata stripeMetadata) throws IOException {
PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+ // LOG.info("TODO# repositionInStreams pps " + Lists.newArrayList(pps));
if (pps == null) return;
for (int i = 0; i < columnReaders.length; i++) {
TreeReader reader = columnReaders[i];
@@ -240,20 +251,46 @@ public class OrcEncodedDataConsumer
}
}
- private PositionProvider[] createPositionProviders(TreeReaderFactory.TreeReader[] columnReaders,
- EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
- if (columnReaders.length == 0) return null;
- int rowGroupIndex = batch.getBatchKey().rgIx;
- if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
- throw new IOException("Cannot position readers without RG information");
+ /**
+ * Position provider used in absence of indexes, e.g. for serde-based reader, where each stream
+ * is in its own physical 'container', always starting at 0, and there are no RGs.
+ */
+ private final static class IndexlessPositionProvider implements PositionProvider {
+ @Override
+ public long getNext() {
+ return 0;
}
- // TODO: this assumes indexes in getRowIndexes would match column IDs
- OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
- PositionProvider[] pps = new PositionProvider[ris.length];
- for (int i = 0; i < ris.length; ++i) {
- OrcProto.RowIndex ri = ris[i];
- if (ri == null) continue;
- pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+
+ @Override
+ public String toString() {
+ return "indexes not supported";
+ }
+ }
+
+ private PositionProvider[] createPositionProviders(
+ TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+ ConsumerStripeMetadata stripeMetadata) throws IOException {
+ if (columnReaders.length == 0) return null;
+ PositionProvider[] pps = null;
+ if (!stripeMetadata.supportsRowIndexes()) {
+ PositionProvider singleRgPp = new IndexlessPositionProvider();
+ pps = new PositionProvider[stripeMetadata.getEncodings().size()];
+ for (int i = 0; i < pps.length; ++i) {
+ pps[i] = singleRgPp;
+ }
+ } else {
+ int rowGroupIndex = batch.getBatchKey().rgIx;
+ if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
+ throw new IOException("Cannot position readers without RG information");
+ }
+ // TODO: this assumes indexes in getRowIndexes would match column IDs
+ OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
+ pps = new PositionProvider[ris.length];
+ for (int i = 0; i < ris.length; ++i) {
+ OrcProto.RowIndex ri = ris[i];
+ if (ri == null) continue;
+ pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+ }
}
return pps;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
index 4e1b851..36f6c9c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
@@ -25,7 +25,7 @@ import org.apache.orc.TypeDescription;
public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> {
public Callable<Void> getReadCallable();
- TypeDescription getFileSchema();
+ TypeDescription getFileSchema(); // TODO: this is ORC-specific and should be removed
TypeDescription getReaderSchema();
boolean[] getIncludedColumns();
}
\ No newline at end of file
[2/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
new file mode 100644
index 0000000..a70545e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -0,0 +1,1248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.TextStripeMetadata;
+import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcFile.EncodingStrategy;
+import org.apache.orc.OrcFile.Version;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.OutStream.OutputReceiver;
+import org.apache.orc.impl.PhysicalWriter;
+import org.apache.orc.impl.StreamName;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class SerDeEncodedDataReader extends CallableWithNdc<Void>
+ implements ConsumerFeedback<OrcEncodedColumnBatch> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SerDeEncodedDataReader.class);
+ public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ public ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ public void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+ @Override
+ public OrcEncodedColumnBatch create() {
+ return new OrcEncodedColumnBatch();
+ }
+ @Override
+ public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+ @Override
+ public CacheChunk create() {
+ return new CacheChunk();
+ }
+ @Override
+ public void resetBeforeOffer(CacheChunk t) {
+ t.reset();
+ }
+ });
+ private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+ @Override
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, offset, end);
+ return tcc;
+ }
+ };
+
+
+ private final SerDeLowLevelCacheImpl cache;
+ private final BufferUsageManager bufferManager;
+ private final Configuration conf;
+ private final FileSplit split;
+ private List<Integer> columnIds;
+ private final OrcEncodedDataConsumer consumer;
+ private final QueryFragmentCounters counters;
+ private final UserGroupInformation ugi;
+
+ private final Object fileKey;
+ private final FileSystem fs;
+
+ private volatile boolean isStopped = false;
+ private final Deserializer sourceSerDe;
+ private final InputFormat<?, ?> sourceInputFormat;
+ private final Reporter reporter;
+ private final JobConf jobConf;
+ private final int allocSize;
+ private final int targetSliceRowCount;
+
+ private final boolean[] writerIncludes;
+ private WriterImpl orcWriter = null;
+ private CacheWriter cacheWriter = null;
+ /**
+ * Data from cache currently being processed. We store it here so that we could decref
+ * it in case of failures. We remove each slice from the data after it has been sent to
+ * the consumer, at which point the consumer is responsible for it.
+ */
+ private FileData cachedData;
+
+ public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
+ BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
+ List<Integer> columnIds, OrcEncodedDataConsumer consumer,
+ JobConf jobConf, Reporter reporter, InputFormat<?, ?> sourceInputFormat,
+ Deserializer sourceSerDe, QueryFragmentCounters counters, TypeDescription schema)
+ throws IOException {
+ this.cache = cache;
+ this.bufferManager = bufferManager;
+ this.conf = daemonConf;
+ this.split = split;
+ this.columnIds = columnIds;
+ this.allocSize = determineAllocSize(bufferManager, daemonConf);
+ boolean isInTest = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_IN_TEST);
+ this.targetSliceRowCount = HiveConf.getIntVar(
+ isInTest ? jobConf : daemonConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT);
+ LOG.info("TODO# targetSliceRowCount = " + targetSliceRowCount);
+ if (this.columnIds != null) {
+ Collections.sort(this.columnIds);
+ }
+ this.consumer = consumer;
+ this.counters = counters;
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ fs = split.getPath().getFileSystem(daemonConf);
+ fileKey = determineFileId(fs, split,
+ HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+ this.sourceInputFormat = sourceInputFormat;
+ this.sourceSerDe = sourceSerDe;
+ this.reporter = reporter;
+ this.jobConf = jobConf;
+ this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
+ }
+
+ private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
+ long allocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_ENCODE_ALLOC_SIZE);
+ int maxAllocSize = bufferManager.getAllocator().getMaxAllocation();
+ if (allocSize > maxAllocSize) {
+ LOG.error("Encode allocation size " + allocSize + " is being capped to the maximum "
+ + "allocation size " + bufferManager.getAllocator().getMaxAllocation());
+ allocSize = maxAllocSize;
+ }
+ return (int)allocSize;
+ }
+
+ @Override
+ public void stop() {
+ LOG.debug("Encoded reader is being stopped");
+ isStopped = true;
+ }
+
+ @Override
+ public void pause() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unpause() {
+ throw new UnsupportedOperationException();
+ }
+
+ // TODO: move to base class?
+ @Override
+ protected Void callInternal() throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ return performDataRead();
+ }
+ });
+ }
+
+ private static final class LineRrOffsetReader extends PassThruOffsetReader {
+ private static final Method isCompressedMethod;
+ private final LineRecordReader lrReader;
+ private final LongWritable posKey;
+
+ static {
+ Method isCompressedMethodTmp;
+ try {
+ isCompressedMethodTmp = LineRecordReader.class.getDeclaredMethod("isCompressedInput");
+ isCompressedMethodTmp.setAccessible(true);
+ } catch (Throwable t) {
+ isCompressedMethodTmp = null;
+ LOG.info("TODO# cannot get LineRecordReader isCompressedInput method", t);
+ }
+ isCompressedMethod = isCompressedMethodTmp;
+ }
+
+ static ReaderWithOffsets create(LineRecordReader sourceReader) {
+ if (isCompressedMethod == null) return null;
+ Boolean isCompressed = null;
+ try {
+ isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ LOG.info("TODO# cannot check the reader for compression", e);
+ return new PassThruOffsetReader(sourceReader);
+ }
+ if (isCompressed) return null; // Cannot slice compressed files.
+ return new LineRrOffsetReader(sourceReader);
+ }
+
+ private LineRrOffsetReader(LineRecordReader sourceReader) {
+ super(sourceReader);
+ this.lrReader = sourceReader;
+ this.posKey = (LongWritable)key;
+ }
+
+ @Override
+ public long getCurrentRowStartOffset() {
+ return posKey.get();
+ }
+
+ @Override
+ public long getCurrentFileOffset() {
+ try {
+ return lrReader.getPos();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ private static class PassThruOffsetReader implements ReaderWithOffsets {
+ protected final RecordReader sourceReader;
+ protected final Object key;
+ protected final Writable value;
+
+ private PassThruOffsetReader(RecordReader sourceReader) {
+ this.sourceReader = sourceReader;
+ key = sourceReader.createKey();
+ value = (Writable)sourceReader.createValue();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return sourceReader.next(key, value);
+ }
+
+ @Override
+ public Writable getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void close() throws IOException {
+ sourceReader.close();
+ }
+
+ @Override
+ public long getCurrentRowStartOffset() {
+ return -1;
+ }
+
+ @Override
+ public long getCurrentFileOffset() {
+ return -1;
+ }
+ }
+
+ public static class CacheOutStream extends OutStream {
+ private final CacheOutputReceiver receiver;
+ public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
+ CacheOutputReceiver receiver) throws IOException {
+ super(name, bufferSize, codec, receiver);
+ this.receiver = receiver;
+ }
+
+ @Override
+ public void clear() throws IOException {
+ super.clear();
+ receiver.clear();
+ }
+ }
+
+ private interface ReaderWithOffsets {
+ boolean next() throws IOException;
+ Writable getCurrentValue();
+ long getCurrentRowStartOffset();
+ void close() throws IOException;
+ long getCurrentFileOffset();
+ }
+
+ public static class CacheWriter implements PhysicalWriter {
+ // Struct.
+ private static class CacheStreamData {
+ private final List<MemoryBuffer> data;
+ private final boolean isSuppressed;
+ private final StreamName name;
+ public CacheStreamData(boolean isSuppressed, StreamName name, List<MemoryBuffer> data) {
+ this.isSuppressed = isSuppressed;
+ this.name = name;
+ this.data = data;
+ }
+ @Override
+ public String toString() {
+ return "CacheStreamData [name=" + name + ", isSuppressed="
+ + isSuppressed + ", data=" + toString(data) + "]";
+ }
+ private static String toString(List<MemoryBuffer> data) {
+ String s = "";
+ for (MemoryBuffer buffer : data) {
+ s += LlapDataBuffer.toDataString(buffer) + ", ";
+ }
+ return s;
+ }
+ }
+
+ private static class CacheStripeData {
+ private List<ColumnEncoding> encodings;
+ private long rowCount = -1;
+ private long knownTornStart, firstRowStart, lastRowStart, lastRowEnd;
+ private Map<Integer, List<CacheStreamData>> colStreams = new HashMap<>();
+ @Override
+ public String toString() {
+ return ("{disk data knownTornStart=" + knownTornStart
+ + ", firstRowStart=" + firstRowStart + ", lastRowStart="
+ + lastRowStart + ", lastRowEnd=" + lastRowEnd + ", rowCount=" + rowCount
+ + ", encodings=" + encodings + ", streams=" + colStreams + "}").replace('\n', ' ');
+ }
+
+ public String toCoordinateString() {
+ return "knownTornStart=" + knownTornStart + ", firstRowStart=" + firstRowStart
+ + ", lastRowStart=" + lastRowStart + ", lastRowEnd=" + lastRowEnd;
+ }
+ }
+
+ private CacheStripeData currentStripe;
+ private final List<CacheStripeData> stripes = new ArrayList<>();
+ private final BufferUsageManager bufferManager;
+ private final int bufferSize;
+ private final List<Integer> columnIds;
+ private final boolean[] writerIncludes;
+ // These are global since ORC reuses objects between stripes.
+ private final Map<StreamName, OutStream> streams = new HashMap<>();
+ private final Map<Integer, List<CacheOutStream>> colStreams = new HashMap<>();
+
+ public CacheWriter(BufferUsageManager bufferManager, int bufferSize, List<Integer> columnIds,
+ boolean[] writerIncludes) {
+ this.bufferManager = bufferManager;
+ this.bufferSize = bufferSize;
+ this.columnIds = columnIds;
+ this.writerIncludes = writerIncludes;
+ startStripe();
+ }
+
+ private void startStripe() {
+ if (currentStripe != null) {
+ stripes.add(currentStripe);
+ }
+ currentStripe = new CacheStripeData();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+ OrcProto.Metadata metadata = builder.build();
+ // LOG.info("TODO# Processing file metadata " + metadata);
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+ OrcProto.Footer footer = builder.build();
+ // LOG.info("TODO# Processing file footer " + footer);
+ validateIncludes(footer);
+ }
+
+ public void validateIncludes(OrcProto.Footer footer) throws IOException {
+ boolean[] translatedIncludes = columnIds == null ? null : OrcInputFormat.genIncludedColumns(
+ OrcUtils.convertTypeFromProtobuf(footer.getTypesList(), 0), columnIds);
+ if (translatedIncludes == null && writerIncludes == null) return;
+ if (translatedIncludes == null || writerIncludes == null) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ int len = Math.min(translatedIncludes.length, writerIncludes.length);
+ for (int i = 0; i < len; ++i) {
+ // Translated includes may be a superset of writer includes due to cache.
+ if (!translatedIncludes[i] && writerIncludes[i]) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ }
+ if (translatedIncludes.length < writerIncludes.length) {
+ for (int i = len; i < writerIncludes.length; ++i) {
+ if (writerIncludes[i]) {
+ throwIncludesMismatchError(translatedIncludes);
+ }
+ }
+ }
+
+ }
+
+ private String throwIncludesMismatchError(boolean[] translated) throws IOException {
+ String s = "Includes derived from the original table: " + DebugUtils.toString(writerIncludes)
+ + " but the ones derived from writer types are: " + DebugUtils.toString(translated);
+ LOG.error(s);
+ throw new IOException(s);
+ }
+
+ @Override
+ public void writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+ // LOG.info("TODO# Ignoring post script " + builder.build());
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Closed from ORC writer, we still need the data. Do not discard anything.
+ }
+
+ public void discardData() {
+ LOG.info("TODO# discarding disk data (if any wasn't cached)");
+ for (CacheStripeData stripe : stripes) {
+ if (stripe.colStreams == null || stripe.colStreams.isEmpty()) continue;
+ for (List<CacheStreamData> streams : stripe.colStreams.values()) {
+ for (CacheStreamData cos : streams) {
+ for (MemoryBuffer buffer : cos.data) {
+ bufferManager.getAllocator().deallocate(buffer);
+ }
+ }
+ }
+ stripe.colStreams.clear();
+ }
+ }
+
+ @Override
+ public long getPhysicalStripeSize() {
+ return 0; // Always 0, no memory checks.
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return false;
+ }
+
+ @Override
+ public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+ OutStream os = streams.get(name);
+ if (os != null) return os;
+ if (isNeeded(name)) {
+ LOG.info("Creating cache receiver for " + name);
+ CacheOutputReceiver or = new CacheOutputReceiver(bufferManager, name);
+ CacheOutStream cos = new CacheOutStream(name.toString(), bufferSize, null, or);
+ os = cos;
+ List<CacheOutStream> list = colStreams.get(name.getColumn());
+ if (list == null) {
+ list = new ArrayList<>();
+ colStreams.put(name.getColumn(), list);
+ }
+ list.add(cos);
+ } else {
+ LOG.info("Creating null receiver for " + name);
+ OutputReceiver or = new NullOutputReceiver(name);
+ os = new OutStream(name.toString(), bufferSize, null, or);
+ }
+ streams.put(name, os);
+ return os;
+ }
+
+ @Override
+ public void finalizeStripe(
+ OrcProto.StripeFooter.Builder footer,
+ OrcProto.StripeInformation.Builder dirEntry)
+ throws IOException {
+ List<ColumnEncoding> allEnc = footer.getColumnsList();
+ OrcProto.StripeInformation si = dirEntry.build();
+ LOG.info(("TODO# Finalizing stripe " + footer.build() + " => " + si).replace('\n', ' '));
+ currentStripe.encodings = new ArrayList<>(allEnc);
+ for (int i = 0; i < currentStripe.encodings.size(); ++i) {
+ // Don't record encodings for unneeded columns.
+ if (writerIncludes == null || writerIncludes[i]) continue;
+ currentStripe.encodings.set(i, null);
+ }
+ currentStripe.rowCount = si.getNumberOfRows();
+ // ORC writer reuses streams, so we need to clean them here and extract data.
+ for (Map.Entry<Integer, List<CacheOutStream>> e : colStreams.entrySet()) {
+ int colIx = e.getKey();
+ List<CacheOutStream> streams = e.getValue();
+ List<CacheStreamData> data = new ArrayList<>(streams.size());
+ for (CacheOutStream stream : streams) {
+ stream.flush();
+ List<MemoryBuffer> buffers = stream.receiver.buffers;
+ if (buffers == null) {
+ LOG.info("TODO# buffers are null for " + stream.receiver.name);
+ }
+ data.add(new CacheStreamData(stream.isSuppressed(), stream.receiver.name,
+ buffers == null ? new ArrayList<MemoryBuffer>() : new ArrayList<>(buffers)));
+ stream.clear();
+ }
+ currentStripe.colStreams.put(colIx, data);
+ }
+ startStripe();
+ }
+
+ @Override
+ public long estimateMemory() {
+ return 0; // We never ever use any memory.
+ }
+
+ @Override
+ public void writeIndexStream(StreamName name,
+ OrcProto.RowIndex.Builder rowIndex) throws IOException {
+ if (isNeeded(name)) {
+ // LOG.info("TODO# Saving index " + name);
+ // currentStripe.indexes.put(name.getColumn(), rowIndex.build());
+ } else {
+ // LOG.info("TODO# Ignoring index " + name + " => " + rowIndex);
+ }
+ }
+
+ private boolean isNeeded(StreamName name) {
+ return writerIncludes == null || writerIncludes[name.getColumn()];
+ }
+
+ @Override
+ public void writeBloomFilterStream(StreamName streamName,
+ OrcProto.BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+ // LOG.info("TODO# Ignoring bloom filter " + streamName + " => " + bloomFilterIndex);
+ }
+
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public long getRawWriterPosition() throws IOException {
+ return -1; // Meaningless for this writer.
+ }
+
+ @Override
+ public void appendRawStripe(byte[] stripe, int offset, int length,
+ OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ throw new UnsupportedOperationException(); // Only used in ACID writer.
+ }
+
+ public void setCurrentStripeOffsets(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long currentFileOffset) {
+ currentStripe.knownTornStart = currentKnownTornStart;
+ currentStripe.firstRowStart = firstStartOffset;
+ currentStripe.lastRowStart = lastStartOffset;
+ currentStripe.lastRowEnd = currentFileOffset;
+ }
+ }
+
+ private interface CacheOutput {
+ List<MemoryBuffer> getData();
+ StreamName getName();
+ }
+
+ private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
+ private final BufferUsageManager bufferManager;
+ private final StreamName name;
+ private List<MemoryBuffer> buffers = null;
+ private int lastBufferPos = -1;
+
+ public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) {
+ this.bufferManager = bufferManager;
+ this.name = name;
+ }
+
+ public void clear() {
+ buffers = null;
+ lastBufferPos = -1;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ // TODO: avoid put() by working directly in OutStream?
+ LOG.info(name + " receiving a buffer of size " + buffer.remaining());
+ int size = buffer.remaining();
+ ByteBuffer bb = null;
+ if (buffers == null) {
+ buffers = new ArrayList<>();
+ }
+ if (!buffers.isEmpty()) {
+ MemoryBuffer lastBuffer = buffers.get(buffers.size() - 1);
+ bb = lastBuffer.getByteBufferRaw();
+ int written = lastBufferPos - bb.position();
+ if (bb.remaining() - written < size) {
+ lastBufferPos = -1;
+ bb = null;
+ }
+ }
+ boolean isNewBuffer = (lastBufferPos == -1);
+ if (isNewBuffer) {
+ MemoryBuffer[] dest = new MemoryBuffer[1];
+ bufferManager.getAllocator().allocateMultiple(dest, size);
+ LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+ bb = newBuffer.getByteBufferRaw();
+ lastBufferPos = bb.position();
+ buffers.add(newBuffer);
+ }
+ // Since there's no close() here, maintain the initial read position between writes.
+ int pos = bb.position();
+ bb.position(lastBufferPos);
+ bb.put(buffer);
+ lastBufferPos = bb.position();
+ bb.position(pos);
+ }
+
+ @Override
+ public List<MemoryBuffer> getData() {
+ return buffers;
+ }
+
+ @Override
+ public StreamName getName() {
+ return name;
+ }
+ }
+
+ private static class NullOutputReceiver implements OutputReceiver {
+ private final StreamName name;
+
+ public NullOutputReceiver(StreamName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ LOG.info(name + " discarding a buffer of size " + buffer.remaining());
+ }
+ }
+
+ protected Void performDataRead() throws IOException {
+ try {
+ long startTime = counters.startTimeCounter();
+ LlapIoImpl.LOG.info("Processing data for {}", split.getPath());
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return null;
+ }
+
+ Boolean isFromCache = null;
+ try {
+ isFromCache = readFileWithCache(startTime);
+ } finally {
+ if (cachedData != null && cachedData.getData() != null) {
+ for (StripeData sd : cachedData.getData()) {
+ unlockAllBuffers(sd);
+ }
+ }
+ }
+ if (isFromCache == null) return null; // Stop requested, and handled inside.
+ if (!isFromCache) {
+ if (!processOneFileSplit(split, startTime, Ref.from(0), null)) return null;
+ }
+
+ // Done with all the things.
+ recordReaderTime(startTime);
+ LOG.info("TODO# calling setDone");
+ consumer.setDone();
+
+ LlapIoImpl.LOG.trace("done processing {}", split);
+ return null;
+ } catch (Throwable e) {
+ LOG.error("TODO# threw", e);
+ consumer.setError(e);
+ throw e;
+ } finally {
+ cleanupReaders();
+ }
+ }
+
+ private void unlockAllBuffers(StripeData si) {
+ for (int i = 0; i < si.getData().length; ++i) {
+ LlapDataBuffer[][] colData = si.getData()[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ bufferManager.decRefBuffer(streamData[k]);
+ }
+ }
+ }
+ }
+
+ public void cacheFileData(StripeData sd) {
+ if (sd == null || sd.getEncodings() == null) return;
+ if (fileKey != null) {
+ // Note that we cache each slice separately. We could cache them together at the end, but
+ // then we won't be able to pass them to users without inc-refing explicitly.
+ ColumnEncoding[] encodings = sd.getEncodings();
+ for (int i = 0; i < encodings.length; ++i) {
+ // Make data consistent with encodings, don't store useless information.
+ if (sd.getData()[i] == null) {
+ encodings[i] = null;
+ }
+ }
+ FileData fd = new FileData(fileKey, encodings.length);
+ fd.addStripe(sd);
+ cache.putFileData(fd, Priority.NORMAL, counters);
+ } else {
+ lockAllBuffers(sd);
+ }
+ // We assume that if put/lock throws in the middle, it's ok to treat buffers as not being
+ // locked and to blindly deallocate them, since they are not going to be used. Therefore
+ // we don't remove them from the cleanup list - we will do it after sending to consumer.
+ // This relies on sequence of calls to cacheFileData and sendEcb..
+ }
+
+
+ private void lockAllBuffers(StripeData sd) {
+ for (int i = 0; i < sd.getData().length; ++i) {
+ LlapDataBuffer[][] colData = sd.getData()[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ boolean canLock = bufferManager.incRefBuffer(streamData[k]);
+ assert canLock;
+ }
+ }
+ }
+ }
+
+ public Boolean readFileWithCache(long startTime) throws IOException {
+ if (fileKey == null) return false;
+ BooleanRef gotAllData = new BooleanRef();
+ long endOfSplit = split.getStart() + split.getLength();
+ this.cachedData = cache.getFileData(fileKey, split.getStart(),
+ endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+ if (cachedData == null) {
+ LOG.info("TODO# no data for the split found in cache");
+ return false;
+ }
+ String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
+ List<StripeData> slices = cachedData.getData();
+ if (slices.isEmpty()) return false;
+ long uncachedPrefixEnd = slices.get(0).getKnownTornStart(),
+ uncachedSuffixStart = slices.get(slices.size() - 1).getLastEnd();
+ Ref<Integer> stripeIx = Ref.from(0);
+ if (uncachedPrefixEnd > split.getStart()) {
+ // TODO: can we merge neighboring splits? So we don't init so many readers.
+ FileSplit sliceSplit = new FileSplit(split.getPath(), split.getStart(),
+ uncachedPrefixEnd - split.getStart(), hosts, inMemoryHosts);
+ if (!processOneFileSplit(sliceSplit, startTime, stripeIx, null)) return null;
+ }
+ while (!slices.isEmpty()) {
+ StripeData slice = slices.get(0);
+ long start = slice.getKnownTornStart();
+ long len = slice.getLastStart() - start; // Will also read the last row.
+ FileSplit sliceSplit = new FileSplit(split.getPath(), start, len, hosts, inMemoryHosts);
+ if (!processOneFileSplit(sliceSplit, startTime, stripeIx, slice)) return null;
+ }
+ boolean isUnfortunate = false;
+ if (uncachedSuffixStart == endOfSplit) {
+ // This is rather obscure. The end of last row cached is precisely at the split end offset.
+ // If the split is in the middle of the file, LRR would read one more row after that,
+ // therefore as unfortunate as it is, we have to do a one-row read. However, for that to
+ // have happened, someone should have supplied a split that ends inside the last row, i.e.
+ // a few bytes earlier than the current split, which is pretty unlikely. What is more likely
+ // is that the split, and the last row, both end at the end of file. Check for this.
+ long size = split.getPath().getFileSystem(conf).getFileStatus(split.getPath()).getLen();
+ isUnfortunate = size > endOfSplit;
+ if (isUnfortunate) {
+ LOG.info("TODO# one-row mismatch at the end of split " + split.getPath() + " at "
+ + endOfSplit + "; file size is " + size);
+ }
+ }
+
+ if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
+ // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+ FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
+ endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
+ if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
+ }
+ return true;
+ }
+
+ public boolean processOneFileSplit(FileSplit split, long startTime,
+ Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+ ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+ LlapIoImpl.LOG.info("TODO# Processing one split {" + split.getPath() + ", "
+ + split.getStart() + ", " + split.getLength() + "}; cache data " + slice);
+ boolean[] splitIncludes = writerIncludes;
+ boolean hasAllData = false;
+ if (cacheEncodings != null) {
+ hasAllData = true;
+ splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+ for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+ if (!splitIncludes[colIx]) continue;
+ assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
+ if (cacheEncodings[colIx] != null) {
+ splitIncludes[colIx] = false;
+ } else {
+ hasAllData = false;
+ }
+ }
+ }
+ LOG.info("TODO# includes accounting for cached data: before " + DebugUtils.toString(
+ writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
+
+ // We have 3 cases here:
+ // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
+ // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
+ // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
+ if (!hasAllData) {
+ // This initializes cacheWriter with data.
+ readSplitFromFile(split, splitIncludes, slice);
+ assert cacheWriter != null;
+ }
+ if (slice != null) {
+ // If we had a cache range already, it should not have been split.
+ assert cacheWriter == null || cacheWriter.stripes.size() == 1;
+ CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+ boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
+ ++stripeIxRef.value;
+ return result;
+ } else {
+ for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+ if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+ return false;
+ }
+ ++stripeIxRef.value;
+ }
+ return true;
+ }
+ }
+
+ private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
+ int stripeIx, StripeData slice, long startTime) throws IOException {
+ String sliceStr = slice == null ? "null" : slice.toCoordinateString();
+ LOG.info("TODO# processing slice #" + stripeIx + " " + sliceStr + "; has"
+ + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
+ + " disk data");
+
+ ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+ LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
+ long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+ TextStripeMetadata metadata = new TextStripeMetadata(stripeIx);
+ StripeData sliceToCache = null;
+ boolean hasAllData = csd == null;
+ if (!hasAllData) {
+ if (slice == null) {
+ sliceToCache = new StripeData(
+ csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
+ csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
+ } else {
+ if (csd.rowCount != slice.getRowCount()) {
+ throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
+ + slice.getRowCount() + " from " + csd + " and " + slice);
+ }
+ if (csd.encodings.size() != slice.getEncodings().length) {
+ throw new IOException("Column count mismatch; disk " + csd.encodings.size()
+ + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
+ }
+ LOG.info("TODO# creating slice to cache in addition to an existing slice "
+ + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
+ sliceToCache = StripeData.duplicateForResults(slice);
+ for (int i = 0; i < csd.encodings.size(); ++i) {
+ sliceToCache.getEncodings()[i] = csd.encodings.get(i);
+ }
+ sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
+ }
+ metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
+ metadata.setRowCount(csd.rowCount);
+ } else {
+ assert cacheWriter == null;
+ metadata.setEncodings(Lists.newArrayList(cacheEncodings));
+ metadata.setRowCount(cacheRowCount);
+ }
+ LOG.info("TODO# derived stripe metadata for this split is " + metadata);
+ consumer.setStripeMetadata(metadata);
+
+ OrcEncodedColumnBatch ecb = ECB_POOL.take();
+ ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+ for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+ if (!writerIncludes[colIx]) continue;
+ ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+ if (!hasAllData && splitIncludes[colIx]) {
+ // The column has been read from disk.
+ List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
+ LOG.info("TODO# processing streams for column " + colIx + ": " + streams);
+ LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+ = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+ if (streams == null) continue; // Struct column, such as root?
+ Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+ while (iter.hasNext()) {
+ CacheWriter.CacheStreamData stream = iter.next();
+ if (stream.isSuppressed) {
+ LOG.info("TODO# removing a suppressed stream " + stream.name);
+ iter.remove();
+ discardUncachedBuffers(stream.data);
+ continue;
+ }
+ // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+ ColumnStreamData cb = CSD_POOL.take();
+ cb.incRef();
+ int streamIx = stream.name.getKind().getNumber();
+ cb.setCacheBuffers(stream.data);
+ // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+ newCacheDataForCol[streamIx] = stream.data.toArray(
+ new LlapDataBuffer[stream.data.size()]);
+ ecb.setStreamData(colIx, streamIx, cb);
+ }
+ } else {
+ // The column has been obtained from cache.
+ LlapDataBuffer[][] colData = cacheData[colIx];
+ LOG.info("TODO# processing cache data for column " + colIx + ": " + SerDeLowLevelCacheImpl.toString(colData));
+ for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+ if (colData[streamIx] == null) continue;
+ ColumnStreamData cb = CSD_POOL.take();
+ cb.incRef();
+ cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+ ecb.setStreamData(colIx, streamIx, cb);
+ }
+ }
+ }
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return false;
+ }
+ // Note: we cache slices one by one since we need to lock them before sending to consumer.
+ // We could lock here, then cache them together, then unlock here and in return,
+ // but for now just rely on the cache put to lock them before we send them over.
+ LOG.info("TODO# Data to cache from the read " + sliceToCache);
+ cacheFileData(sliceToCache);
+ return sendEcbToConsumer(ecb, slice != null, csd);
+ }
+
+ private void discardUncachedBuffers(List<MemoryBuffer> list) {
+ for (MemoryBuffer buffer : list) {
+ boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
+ assert isInvalidated;
+ bufferManager.getAllocator().deallocate(buffer);
+ }
+ }
+
+ private static List<ColumnEncoding> combineCacheAndWriterEncodings(
+ ColumnEncoding[] cacheEncodings, List<ColumnEncoding> writerEncodings) throws IOException {
+ // TODO: refactor with cache impl? it has the same merge logic
+ if (cacheEncodings == null) {
+ return new ArrayList<>(writerEncodings);
+ }
+ if (cacheEncodings.length != writerEncodings.size()) {
+ throw new IOException("Incompatible encoding lengths: "
+ + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+ }
+ ColumnEncoding[] combinedEncodings = Arrays.copyOf(cacheEncodings, cacheEncodings.length);
+ for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+ ColumnEncoding newEncoding = writerEncodings.get(colIx);
+ if (newEncoding == null) continue;
+ if (combinedEncodings[colIx] != null && !newEncoding.equals(combinedEncodings[colIx])) {
+ throw new IOException("Incompatible encodings at " + colIx + ": "
+ + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+ }
+ combinedEncodings[colIx] = newEncoding;
+ }
+ return Lists.newArrayList(combinedEncodings);
+ }
+
+
+ public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
+ throws IOException {
+ boolean maySplitTheSplit = slice == null;
+ ReaderWithOffsets offsetReader = null;
+ @SuppressWarnings("rawtypes")
+ RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+ try {
+ LOG.info("TODO# using " + sourceReader.getClass().getSimpleName() + " to read data");
+ // TODO# need a more general approach to this. At least, need to factor this out and add configs.
+ if (sourceReader instanceof LineRecordReader) {
+ offsetReader = LineRrOffsetReader.create((LineRecordReader)sourceReader);
+ maySplitTheSplit = maySplitTheSplit && offsetReader != null;
+ sourceReader = null;
+ } else {
+ offsetReader = new PassThruOffsetReader(sourceReader);
+ sourceReader = null;
+ }
+ ObjectInspector sourceOi;
+ try {
+ sourceOi = sourceSerDe.getObjectInspector();
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+
+ // TODO: ideally, we want to transform the rows to only have the included columns, and
+ // only write those to the writer, with modified schema; then map back to full set later.
+ WriterOptions opts = OrcFile.writerOptions(conf)
+ .stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+ .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
+ .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
+ .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi);
+
+
+ // Column IDs are only used to verify eventual writer includes.
+ cacheWriter = new CacheWriter(bufferManager, allocSize, columnIds, splitIncludes);
+ orcWriter = new WriterImpl(cacheWriter, null, opts);
+ int rowsPerSlice = 0;
+ long currentKnownTornStart = split.getStart();
+ long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+ while (offsetReader.next()) {
+ Writable value = offsetReader.getCurrentValue();
+ lastStartOffset = offsetReader.getCurrentRowStartOffset();
+ if (firstStartOffset == Long.MIN_VALUE) {
+ firstStartOffset = lastStartOffset;
+ }
+ Object row = null;
+ try {
+ row = sourceSerDe.deserialize(value);
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ orcWriter.addRow(row);
+ if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+ long fileOffset = offsetReader.getCurrentFileOffset();
+ // Must support offsets to be able to split.
+ if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+ throw new AssertionError("Unable to get offsets from "
+ + offsetReader.getClass().getSimpleName());
+ }
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ // Split starting at row start will not read that row.
+ currentKnownTornStart = lastStartOffset;
+ // Row offsets will be determined from the reader (we could set the first from last).
+ lastStartOffset = Long.MIN_VALUE;
+ firstStartOffset = Long.MIN_VALUE;
+ rowsPerSlice = 0;
+ orcWriter.writeIntermediateFooter();
+ }
+ }
+ if (rowsPerSlice > 0) {
+ long fileOffset = offsetReader.getCurrentFileOffset();
+ if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+ // The reader doesn't support offsets.
+ // TODO## Dbl check if these shennanigans are correct wrt cache matching.
+ // We want to match the exact same splits, and not match anything else ever.
+ // Perhaps we should just add a flag that would allow us to match exactly.
+ // If cached split was starting at row start, that row would be skipped, so +1
+ firstStartOffset = split.getStart() + 1;
+ // Last row starting at the end of the split would be read.
+ lastStartOffset = split.getStart() + split.getLength();
+ // However, it must end after the split end, otherwise the next one would have been read.
+ fileOffset = lastStartOffset + 1;
+ LOG.info("TODO# setting fake cache offsets based on split offsets - 'first row' at "
+ + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+ }
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ }
+ // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
+ orcWriter.close();
+ orcWriter = null;
+ } finally {
+ // We don't need the source reader anymore.
+ if (offsetReader != null) {
+ try {
+ offsetReader.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close source reader", ex);
+ }
+ } else {
+ assert sourceReader != null;
+ try {
+ sourceReader.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close source reader", ex);
+ }
+ }
+ }
+ }
+
+ private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
+ SplitLocationInfo[] locInfo = split.getLocationInfo();
+ if (locInfo == null) return new String[0];
+ List<String> hosts = null; // TODO: most of the time, there's no in-memory. Use an array?
+ for (int i = 0; i < locInfo.length; i++) {
+ if (locInfo[i].isInMemory() != isInMemory) continue;
+ if (hosts == null) {
+ hosts = new ArrayList<>();
+ }
+ hosts.add(locInfo[i].getLocation());
+ }
+ if (hosts == null) return new String[0];
+ return hosts.toArray(new String[hosts.size()]);
+ }
+
+ private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
+ boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+ if (ecb == null) { // This basically means stop has been called.
+ cleanupReaders();
+ return false;
+ }
+ LOG.info("TODO# Sending over the ecb");
+ try {
+ consumer.consumeData(ecb);
+ } catch (Throwable ex) {
+ LOG.info("TODO# threw", ex);
+ consumer.setError(ex); // TODO## this is wrong, it shouldn't throw
+ }
+ if (hasCachedSlice) {
+ cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
+ }
+ if (writerData != null) {
+ writerData.colStreams.clear();
+ }
+ return true;
+ }
+
+
+ private void cleanupReaders() {
+ if (orcWriter != null) {
+ try {
+ orcWriter.close();
+ orcWriter = null;
+ } catch (Exception ex) {
+ LOG.error("Failed to close ORC writer", ex);
+ }
+ }
+ if (cacheWriter != null) {
+ try {
+ cacheWriter.discardData();
+ cacheWriter = null;
+ } catch (Exception ex) {
+ LOG.error("Failed to close cache writer", ex);
+ }
+ }
+ }
+
+ private void recordReaderTime(long startTime) {
+ counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+ }
+
+ private boolean processStop() {
+ if (!isStopped) return false;
+ LOG.info("SerDe-based data reader is stopping");
+ cleanupReaders();
+ return true;
+ }
+
+ private static Object determineFileId(FileSystem fs, FileSplit split,
+ boolean allowSynthetic) throws IOException {
+ /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
+ Object fileKey = ((OrcSplit)split).getFileKey();
+ if (fileKey != null) return fileKey; */
+ LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
+ return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
+ }
+
+ // TODO: move to a superclass?
+ @Override
+ public void returnData(OrcEncodedColumnBatch ecb) {
+ for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+ if (!ecb.hasData(colIx)) continue;
+ ColumnStreamData[] datas = ecb.getColumnData(colIx);
+ for (ColumnStreamData data : datas) {
+ if (data == null || data.decRef() != 0) continue;
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ for (MemoryBuffer buf : data.getCacheBuffers()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf);
+ }
+ }
+ bufferManager.decRefBuffers(data.getCacheBuffers());
+ CSD_POOL.offer(data);
+ }
+ }
+ // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+ ECB_POOL.offer(ecb);
+ }
+
+ public TezCounters getTezCounters() {
+ return counters.getTezCounters();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
new file mode 100644
index 0000000..040f1a7
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public interface ConsumerFileMetadata {
+ int getStripeCount();
+ CompressionKind getCompressionKind();
+ List<Type> getTypes();
+ TypeDescription getSchema();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
new file mode 100644
index 0000000..1e28f5f
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+
+public interface ConsumerStripeMetadata {
+ int getStripeIx();
+ long getRowCount();
+ List<ColumnEncoding> getEncodings();
+ String getWriterTimezone();
+ RowIndexEntry getRowIndexEntry(int colIx, int rgIx); // TODO: remove?
+ RowIndex[] getRowIndexes();
+ boolean supportsRowIndexes();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 70cba05..2c7a234 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -41,7 +41,8 @@ import org.apache.orc.impl.ReaderImpl;
* of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
* or instead use protobuf structs everywhere instead of the mix of things like now.
*/
-public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMetadata {
+public final class OrcFileMetadata extends LlapCacheableBuffer
+ implements FileMetadata, ConsumerFileMetadata {
private final List<StripeInformation> stripes;
private final List<Integer> versionList;
private final List<OrcProto.StripeStatistics> stripeStats;
@@ -225,6 +226,11 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
return fileStats;
}
+ @Override
+ public int getStripeCount() {
+ return stripes.size();
+ }
+
public TypeDescription getSchema() {
return OrcUtils.convertTypeFromProtobuf(this.types, 0);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 3f4f43b..73a1721 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.MemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
public class OrcMetadataCache {
private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
@@ -116,7 +117,6 @@ public class OrcMetadataCache {
return touchOnGet(metadata.get(fileKey));
}
-
private <T extends LlapCacheableBuffer> T touchOnGet(T result) {
if (result != null) {
policy.notifyLock(result);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 6f0b9ff..5ef1678 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -32,10 +32,11 @@ import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
import org.apache.orc.DataReader;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.RowIndexEntry;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.OrcIndex;
-public class OrcStripeMetadata extends LlapCacheableBuffer {
+public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata {
private final OrcBatchKey stripeKey;
private final List<OrcProto.ColumnEncoding> encodings;
private final List<OrcProto.Stream> streams;
@@ -172,4 +173,14 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
public void resetRowIndex() {
rowIndex = null;
}
+
+ @Override
+ public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+ return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx);
+ }
+
+ @Override
+ public boolean supportsRowIndexes() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index dc83b9c..4f02926 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.orc.OrcProto.Type.Builder;
import org.apache.orc.impl.ReaderImpl;
import com.google.common.collect.Lists;
@@ -538,4 +539,86 @@ public class OrcUtils {
}
return result;
}
+
+ public static List<TypeDescription> setTypeBuilderFromSchema(
+ OrcProto.Type.Builder type, TypeDescription schema) {
+ List<TypeDescription> children = schema.getChildren();
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(OrcProto.Type.Kind.VARCHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(schema.getPrecision());
+ type.setScale(schema.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ for(String field: schema.getFieldNames()) {
+ type.addFieldNames(field);
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ schema.getCategory());
+ }
+ return children;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
index 83742e4..5ba1b9b 100644
--- a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -19,7 +19,6 @@
package org.apache.orc.impl;
import java.io.IOException;
-import java.util.EnumSet;
import org.apache.orc.OrcProto.BloomFilterIndex;
import org.apache.orc.OrcProto.Footer;
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 975804b..9433e54 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -232,6 +232,11 @@ public class RecordReaderImpl implements RecordReader {
public long getNext() {
return entry.getPositions(index++);
}
+
+ @Override
+ public String toString() {
+ return "{" + entry.getPositionsList() + "; " + index + "}";
+ }
}
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 484209b..4bb51c3 100644
--- a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1223,6 +1223,7 @@ public class TreeReaderFactory {
}
}
+ private static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TreeReaderFactory.class);
// This class collects together very similar methods for reading an ORC vector of byte arrays and
// creating the BytesColumnVector.
//
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b17fb41..e0fcae7 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -125,10 +125,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public WriterImpl(FileSystem fs,
Path path,
OrcFile.WriterOptions opts) throws IOException {
- this.path = path;
+ this(new PhysicalFsWriter(fs, path, opts.getSchema().getMaximumId() + 1, opts), path, opts);
+ }
+
+ public WriterImpl(PhysicalWriter writer,
+ Path pathForMem,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.physWriter = writer;
+ this.path = pathForMem;
this.conf = opts.getConfiguration();
- this.callback = opts.getCallback();
this.schema = opts.getSchema();
+ this.callback = opts.getCallback();
if (callback != null) {
callbackContext = new OrcFile.WriterContext(){
@@ -153,8 +160,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
}
this.bloomFilterFpp = opts.getBloomFilterFpp();
- int numColumns = schema.getMaximumId() + 1;
- physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
treeWriter = createTreeWriter(schema, streamFactory, false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
@@ -162,7 +167,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
// ensure that we are able to handle callbacks before we register ourselves
- memoryManager.addWriter(path, opts.getStripeSize(), this);
+ if (path != null) {
+ memoryManager.addWriter(path, opts.getStripeSize(), this);
+ }
}
@Override
@@ -2112,83 +2119,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static void writeTypes(OrcProto.Footer.Builder builder,
TypeDescription schema) {
OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- List<TypeDescription> children = schema.getChildren();
- switch (schema.getCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- type.setKind(OrcProto.Type.Kind.CHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case VARCHAR:
- type.setKind(OrcProto.Type.Kind.VARCHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(schema.getPrecision());
- type.setScale(schema.getScale());
- break;
- case LIST:
- type.setKind(OrcProto.Type.Kind.LIST);
- type.addSubtypes(children.get(0).getId());
- break;
- case MAP:
- type.setKind(OrcProto.Type.Kind.MAP);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- case STRUCT:
- type.setKind(OrcProto.Type.Kind.STRUCT);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- for(String field: schema.getFieldNames()) {
- type.addFieldNames(field);
- }
- break;
- case UNION:
- type.setKind(OrcProto.Type.Kind.UNION);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown category: " +
- schema.getCategory());
- }
+ List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
builder.addTypes(type);
if (children != null) {
for(TypeDescription child: children) {
@@ -2380,7 +2311,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
callback.preFooterWrite(callbackContext);
}
// remove us from the memory manager so that we don't get any callbacks
- memoryManager.removeWriter(path);
+ if (path != null) {
+ memoryManager.removeWriter(path);
+ }
// actually close the file
flushStripe();
writeMetadata();
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 3d81e43..b9c39de 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.hive.llap;
*/
public class DebugUtils {
public static String toString(boolean[] a) {
+ if (a == null) return "null";
StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; i < a.length; ++i) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 601ad08..004bb2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -225,7 +225,7 @@ public class FetchOperator implements Serializable {
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return HiveInputFormat.wrapForLlap(format, conf);
+ return format;
}
private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 37e4b9b..46270bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -327,7 +327,7 @@ public final class Utilities {
return null;
}
- public static BaseWork getMergeWork(JobConf jconf) {
+ public static BaseWork getMergeWork(Configuration jconf) {
if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
|| (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
return null;
@@ -335,7 +335,7 @@ public final class Utilities {
return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
}
- public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+ public static BaseWork getMergeWork(Configuration jconf, String prefix) {
if (prefix == null || prefix.isEmpty()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index ba25573..de36f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -19,11 +19,20 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileSplit;
@@ -42,6 +51,9 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit;
*/
public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
extends HiveContextAwareRecordReader<K, V> {
+ private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class);
+
+ private LinkedHashMap<Path, PartitionDesc> pathToPartInfo;
public CombineHiveRecordReader(InputSplit split, Configuration conf,
Reporter reporter, Integer partition, RecordReader preReader) throws IOException {
@@ -57,8 +69,27 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
throw new IOException("CombineHiveRecordReader: class not found "
+ inputFormatClassName);
}
- InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
- inputFormatClass, jobConf);
+ InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+ try {
+ // TODO: refactor this out
+ if (pathToPartInfo == null) {
+ MapWork mrwork;
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+ if (mrwork == null) {
+ mrwork = Utilities.getMapWork(jobConf);
+ }
+ } else {
+ mrwork = Utilities.getMapWork(jobConf);
+ }
+ pathToPartInfo = mrwork.getPathToPartitionInfo();
+ }
+
+ PartitionDesc part = extractSinglePartSpec(hsplit);
+ inputFormat = HiveInputFormat.wrapForLlap(inputFormat, jobConf, part);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
// create a split for the given partition
FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
@@ -79,6 +110,26 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
}
+ private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws IOException {
+ PartitionDesc part = null;
+ Map<Map<Path,PartitionDesc>, Map<Path,PartitionDesc>> cache = new HashMap<>();
+ for (Path path : hsplit.getPaths()) {
+ PartitionDesc otherPart = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartInfo, path, cache);
+ LOG.debug("Found spec for " + path + " " + otherPart + " from " + pathToPartInfo);
+ if (part == null) {
+ part = otherPart;
+ } else if (otherPart != part) { // Assume we should have the exact same object.
+ // TODO: we could also compare the schema and SerDe, and pass only those to the call
+ // instead; most of the time these would be the same and LLAP IO can handle that.
+ LOG.warn("Multiple partitions found; not going to pass a part spec to LLAP IO: {"
+ + part.getPartSpec() + "} and {" + otherPart.getPartSpec() + "}");
+ return null;
+ }
+ }
+ return part;
+ }
+
@Override
public void doClose() throws IOException {
recordReader.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 94fcd60..f8391e0 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -202,7 +204,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
public static InputFormat<WritableComparable, Writable> wrapForLlap(
- InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+ InputFormat<WritableComparable, Writable> inputFormat, Configuration conf,
+ PartitionDesc part) throws HiveException {
if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
return inputFormat; // LLAP not enabled, no-op.
}
@@ -227,7 +230,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
LOG.info("Not using LLAP IO because it is not initialized");
return inputFormat;
}
- return castInputFormat(llapIo.getInputFormat(inputFormat));
+ LlapWrappableInputFormatInterface llapIf = (LlapWrappableInputFormatInterface)inputFormat;
+ Deserializer serde = null;
+ if (llapIf.isSerdeBased()) {
+ if (part == null) {
+ LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF", new Exception());
+ return inputFormat;
+ }
+ try {
+ serde = part.getDeserializer(conf);
+ } catch (Exception e) {
+ throw new HiveException("Error creating SerDe for LLAP IO", e);
+ }
+ }
+ return castInputFormat(llapIo.getInputFormat(inputFormat, serde));
}
@@ -248,7 +264,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
return (InputFormat<T, U>)from;
}
-
+ /** NOTE: this no longer wraps the IF for LLAP. Call wrapForLlap manually if needed. */
public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
Class inputFormatClass, JobConf job) throws IOException {
InputFormat<WritableComparable, Writable> instance = inputFormats.get(inputFormatClass);
@@ -266,7 +282,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
+ inputFormatClass.getName() + " as specified in mapredWork!", e);
}
}
- return wrapForLlap(instance, job);
+ return instance;
}
public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -287,15 +303,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
boolean nonNative = false;
- PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath());
+ PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartitionInfo, hsplit.getPath(), null);
+ LOG.debug("Found spec for " + hsplit.getPath() + " " + part + " from " + pathToPartitionInfo);
+
if ((part != null) && (part.getTableDesc() != null)) {
Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), job);
nonNative = part.getTableDesc().isNonNative();
}
- pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(), nonNative);
+ Path splitPath = hsplit.getPath();
+ pushProjectionsAndFilters(job, inputFormatClass, splitPath, nonNative);
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ try {
+ inputFormat = HiveInputFormat.wrapForLlap(inputFormat, job, part);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
RecordReader innerReader = null;
try {
innerReader = inputFormat.getRecordReader(inputSplit, job, reporter);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
index 66e1f90..f168f3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
@@ -18,5 +18,7 @@
package org.apache.hadoop.hive.ql.io;
-/** Marker interface for LLAP; serves no other purpose. */
-public interface LlapWrappableInputFormatInterface {}
+/** Marker interface for LLAP IO. */
+public interface LlapWrappableInputFormatInterface {
+ boolean isSerdeBased();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 361901e..2a89c03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -332,6 +332,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<Integer> included) {
boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+ if (included == null) {
+ Arrays.fill(result, true);
+ return result;
+ }
result[0] = true;
List<TypeDescription> children = readerSchema.getChildren();
for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
@@ -2482,4 +2486,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols);
}
+
+
+ @Override
+ public boolean isSerdeBased() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 075c3b4..3e4ec2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -93,14 +93,28 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
OrcFile.WriterOptions opts) throws IOException {
super(fs, path, opts);
this.inspector = opts.getInspector();
- internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.fields = initializeFieldsFromOi(inspector);
+ }
+
+ public WriterImpl(PhysicalWriter writer,
+ Path pathForMem,
+ OrcFile.WriterOptions opts) throws IOException {
+ super(writer, pathForMem, opts);
+ this.inspector = opts.getInspector();
+ this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ this.fields = initializeFieldsFromOi(inspector);
+ }
+
+ private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) {
if (inspector instanceof StructObjectInspector) {
List<? extends StructField> fieldList =
((StructObjectInspector) inspector).getAllStructFieldRefs();
- fields = new StructField[fieldList.size()];
+ StructField[] fields = new StructField[fieldList.size()];
fieldList.toArray(fields);
+ return fields;
} else {
- fields = null;
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index 2325140..8857d3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -83,4 +83,8 @@ public class CacheChunk extends DiskRangeList {
public void reset() {
init(null, -1, -1);
}
+
+ public void adjustEnd(long l) {
+ this.end += l;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index d5f5f9d..0dba1a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -1775,7 +1775,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
" present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
- data != null, dictionary != null, lengths != null, secondary != null, tz);
+ data, dictionary != null, lengths != null, secondary != null, tz);
}
switch (schema.getCategory()) {
case BINARY:
[4/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/682a3c7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/682a3c7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/682a3c7b
Branch: refs/heads/master-15147
Commit: 682a3c7b46aec9e43275551698fa6ba9c7ac5d7c
Parents: 7f46c8d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 16 18:57:28 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 16 18:57:28 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +
.../apache/hadoop/hive/llap/io/api/LlapIo.java | 3 +-
.../llap/IncrementalObjectSizeEstimator.java | 2 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 3 +-
.../hive/llap/cache/EvictionDispatcher.java | 11 +-
.../hadoop/hive/llap/cache/FileCache.java | 107 ++
.../hive/llap/cache/FileCacheCleanupThread.java | 104 ++
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 7 +
.../hive/llap/cache/LowLevelCacheImpl.java | 218 +--
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 716 ++++++++++
.../hive/llap/io/api/impl/LlapInputFormat.java | 348 +----
.../hive/llap/io/api/impl/LlapIoImpl.java | 26 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 335 +++++
.../llap/io/decode/ColumnVectorProducer.java | 11 +-
.../llap/io/decode/EncodedDataConsumer.java | 7 +
.../io/decode/GenericColumnVectorProducer.java | 201 +++
.../llap/io/decode/LlapTextInputFormat.java | 33 +
.../llap/io/decode/OrcColumnVectorProducer.java | 12 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 95 +-
.../hive/llap/io/decode/ReadPipeline.java | 2 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 1248 ++++++++++++++++++
.../llap/io/metadata/ConsumerFileMetadata.java | 31 +
.../io/metadata/ConsumerStripeMetadata.java | 35 +
.../hive/llap/io/metadata/OrcFileMetadata.java | 8 +-
.../hive/llap/io/metadata/OrcMetadataCache.java | 2 +-
.../llap/io/metadata/OrcStripeMetadata.java | 13 +-
orc/src/java/org/apache/orc/OrcUtils.java | 83 ++
.../org/apache/orc/impl/PhysicalWriter.java | 1 -
.../org/apache/orc/impl/RecordReaderImpl.java | 5 +
.../org/apache/orc/impl/TreeReaderFactory.java | 1 +
.../java/org/apache/orc/impl/WriterImpl.java | 99 +-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 1 +
.../hadoop/hive/ql/exec/FetchOperator.java | 2 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 4 +-
.../hive/ql/io/CombineHiveRecordReader.java | 55 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 37 +-
.../io/LlapWrappableInputFormatInterface.java | 6 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 10 +
.../hadoop/hive/ql/io/orc/WriterImpl.java | 20 +-
.../hive/ql/io/orc/encoded/CacheChunk.java | 4 +
.../orc/encoded/EncodedTreeReaderFactory.java | 2 +-
.../hive/ql/io/orc/encoded/StreamUtils.java | 13 +-
.../ql/io/rcfile/stats/PartialScanMapper.java | 5 +-
ql/src/test/queries/clientpositive/llap_text.q | 62 +
.../test/results/clientpositive/llap_text.q.out | 502 +++++++
.../hadoop/hive/common/io/DiskRangeList.java | 6 +
.../common/io/encoded/EncodedColumnBatch.java | 17 +
47 files changed, 3898 insertions(+), 622 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b4e89b0..9806105 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2900,6 +2900,13 @@ public class HiveConf extends Configuration {
LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
"Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
"cases of file overwrites. This is supported on HDFS."),
+ LLAP_IO_ENCODE_ALLOC_SIZE("hive.llap.io.encode.alloc.size", "256Kb", new SizeValidator(),
+ "Allocation size for the buffers used to cache encoded data from non-ORC files. Must\n" +
+ "be a power of two between " + LLAP_ALLOCATOR_MIN_ALLOC + " and\n" +
+ LLAP_ALLOCATOR_MAX_ALLOC + "."),
+ LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
+ "Row count to use to separate cache slices when caching encoded data from row-based\n" +
+ "inputs into LLAP cache."),
LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
"Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index d82757f..e5ab601 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -18,10 +18,11 @@
package org.apache.hadoop.hive.llap.io.api;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
public interface LlapIo<T> {
- InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat);
+ InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
void close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index 3efbcc2..ff6e7ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
* during the actual estimation. TODO: clean up
*/
public class IncrementalObjectSizeEstimator {
- private static final JavaDataModel memoryModel = JavaDataModel.get();
+ public static final JavaDataModel memoryModel = JavaDataModel.get();
private enum FieldType { PRIMITIVE_ARRAY, OBJECT_ARRAY, COLLECTION, MAP, OTHER };
public static HashMap<Class<?>, ObjectEstimator> createEstimators(Object rootObj) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index d9d407d..8d7f0d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -611,7 +611,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
assert data != null;
int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
freeListIx = freeListFromHeader(headers[headerIx]);
- assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2);
+ assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
+ : buffer.allocSize + " " + freeListIx;
while (true) {
FreeList freeList = freeLists[freeListIx];
int bHeaderIx = headerIx ^ (1 << freeListIx);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index b6fd3e3..2d3197c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -27,11 +27,16 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
*/
public final class EvictionDispatcher implements EvictionListener {
private final LowLevelCache dataCache;
+ private final SerDeLowLevelCacheImpl serdeCache;
private final OrcMetadataCache metadataCache;
+ private final EvictionAwareAllocator allocator;
- public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
+ public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
+ OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
this.dataCache = dataCache;
this.metadataCache = metadataCache;
+ this.serdeCache = serdeCache;
+ this.allocator = allocator;
}
@Override
@@ -40,7 +45,11 @@ public final class EvictionDispatcher implements EvictionListener {
}
public void notifyEvicted(LlapDataBuffer buffer) {
+ // Note: we don't know which cache this is from, so we notify both. They can noop if they
+ // want to find the buffer in their structures and can't.
dataCache.notifyEvicted(buffer);
+ serdeCache.notifyEvicted(buffer);
+ allocator.deallocateEvicted(buffer);
}
public void notifyEvicted(OrcFileMetadata buffer) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
new file mode 100644
index 0000000..44b71c9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Function;
+
+/** Class used for a single file in LowLevelCacheImpl, etc. */
+class FileCache<T> {
+ private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+ private final T cache;
+ private final AtomicInteger refCount = new AtomicInteger(0);
+
+ private FileCache(T value) {
+ this.cache = value;
+ }
+
+ public T getCache() {
+ return cache;
+ }
+
+ boolean incRef() {
+ while (true) {
+ int value = refCount.get();
+ if (value == EVICTED_REFCOUNT) return false;
+ if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+ assert value >= 0;
+ if (refCount.compareAndSet(value, value + 1)) return true;
+ }
+ }
+
+ void decRef() {
+ int value = refCount.decrementAndGet();
+ if (value < 0) {
+ throw new AssertionError("Unexpected refCount " + value);
+ }
+ }
+
+ boolean startEvicting() {
+ while (true) {
+ int value = refCount.get();
+ if (value != 1) return false;
+ if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+ }
+ }
+
+ void commitEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+ assert result;
+ }
+
+ void abortEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+ assert result;
+ }
+
+ /**
+ * All this mess is necessary because we want to be able to remove sub-caches for fully
+ * evicted files. It may actually be better to have non-nested map with object keys?
+ */
+ public static <T> FileCache<T> getOrAddFileSubCache(
+ ConcurrentHashMap<Object, FileCache<T>> cache, Object fileKey,
+ Function<Void, T> createFunc) {
+ FileCache<T> newSubCache = null;
+ while (true) { // Overwhelmingly executes once.
+ FileCache<T> subCache = cache.get(fileKey);
+ if (subCache != null) {
+ if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+ if (newSubCache == null) {
+ newSubCache = new FileCache<T>(createFunc.apply(null));
+ newSubCache.incRef();
+ }
+ // Found a stale value we cannot incRef; try to replace it with new value.
+ if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
+ continue; // Someone else replaced/removed a stale value, try again.
+ }
+ // No value found.
+ if (newSubCache == null) {
+ newSubCache = new FileCache<T>(createFunc.apply(null));
+ newSubCache.incRef();
+ }
+ FileCache<T> oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
+ if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+ if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+ // Someone created one in parallel and then it went stale.
+ if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
+ // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
new file mode 100644
index 0000000..17c7ee6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hive.common.util.Ref;
+
+/** Class used to slowly clean up a map of FileCache-s. */
+abstract class FileCacheCleanupThread<T> extends Thread {
+ private final long approxCleanupIntervalSec;
+ private final AtomicInteger newEvictions;
+ private final ConcurrentHashMap<Object, FileCache<T>> fileMap;
+
+ public FileCacheCleanupThread(String name, ConcurrentHashMap<Object, FileCache<T>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super(name);
+ this.fileMap = fileMap;
+ this.newEvictions = newEvictions;
+ this.approxCleanupIntervalSec = cleanupInterval;
+ setDaemon(true);
+ setPriority(1);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ doOneCleanupRound();
+ } catch (InterruptedException ex) {
+ LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+ break;
+ }
+ }
+ }
+
+ private void doOneCleanupRound() throws InterruptedException {
+ while (true) {
+ int evictionsSinceLast = newEvictions.getAndSet(0);
+ if (evictionsSinceLast > 0) break;
+ synchronized (newEvictions) {
+ newEvictions.wait(10000);
+ }
+ }
+ // Duration is an estimate; if the size of the map changes, it can be very different.
+ long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
+ int leftToCheck = 0; // approximate
+ for (FileCache<T> fc : fileMap.values()) {
+ leftToCheck += getCacheSize(fc);
+ }
+ // Iterate thru all the filecaches. This is best-effort.
+ // If these super-long-lived iterators affect the map in some bad way,
+ // we'd need to sleep once per round instead.
+ Iterator<Map.Entry<Object, FileCache<T>>> iter = fileMap.entrySet().iterator();
+ Ref<Boolean> isPastEndTime = Ref.from(false);
+ while (iter.hasNext()) {
+ FileCache<T> fc = iter.next().getValue();
+ if (!fc.incRef()) {
+ throw new AssertionError("Something other than cleanup is removing elements from map");
+ }
+ leftToCheck = cleanUpOneFileCache(fc, leftToCheck, endTime, isPastEndTime);
+ if (getCacheSize(fc) > 0) {
+ fc.decRef();
+ continue;
+ }
+ // FileCache might be empty; see if we can remove it. "tryWriteLock"
+ if (!fc.startEvicting()) continue;
+ if (getCacheSize(fc) == 0) {
+ fc.commitEvicting();
+ iter.remove();
+ } else {
+ fc.abortEvicting();
+ }
+ }
+ }
+
+ protected abstract int getCacheSize(FileCache<T> fc);
+
+ protected abstract int cleanUpOneFileCache(FileCache<T> fc, int leftToCheck, long endTime,
+ Ref<Boolean> isPastEndTime) throws InterruptedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index d1a961c..7d5c101 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -139,4 +139,11 @@ public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryB
int refCount = this.refCount.get();
return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
}
+
+ public static String toDataString(MemoryBuffer s) {
+ if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
+ byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
+ int i = (b < 0) ? -b : b;
+ return s + " (0x" + Integer.toHexString(i) + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index ea458ca..72980ae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -32,33 +32,45 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
- private final EvictionAwareAllocator allocator;
+ private final Allocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
private Thread cleanupThread = null;
- private final ConcurrentHashMap<Object, FileCache> cache =
- new ConcurrentHashMap<Object, FileCache>();
+ // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+ // In fact, CSLM has slow single-threaded operation, and one file is probably often read
+ // by just one (or few) threads, so a much more simple DS with locking might be better.
+ // Let's use CSLM for now, since it's available.
+ private final ConcurrentHashMap<Object,
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> cache = new ConcurrentHashMap<>();
private final LowLevelCachePolicy cachePolicy;
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
private final boolean doAssumeGranularBlocks;
+ private static final Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>> CACHE_CTOR =
+ new Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>>() {
+ @Override
+ public ConcurrentSkipListMap<Long, LlapDataBuffer> apply(Void input) {
+ return new ConcurrentSkipListMap<>();
+ }
+ };
+
public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
- EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks) {
+ Allocator allocator, boolean doAssumeGranularBlocks) {
this(metrics, cachePolicy, allocator, doAssumeGranularBlocks, DEFAULT_CLEANUP_INTERVAL);
}
@VisibleForTesting
LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
- EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
-
+ Allocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval);
this.cachePolicy = cachePolicy;
this.allocator = allocator;
@@ -69,7 +81,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
public void startThreads() {
if (cleanupInterval < 0) return;
- cleanupThread = new CleanupThread(cleanupInterval);
+ cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
cleanupThread.start();
}
@@ -78,7 +90,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
- FileCache subCache = cache.get(fileKey);
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache = cache.get(fileKey);
if (subCache == null || !subCache.incRef()) {
long totalMissed = ranges.getTotalLength();
metrics.incrCacheRequestedBytes(totalMissed);
@@ -102,7 +114,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
metrics.incrCacheRequestedBytes(current.getLength());
// We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
DiskRangeList next = current.next;
- getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
+ getOverlappingRanges(baseOffset, current, subCache.getCache(), factory, gotAllData);
current = next;
}
} finally {
@@ -234,7 +246,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
- FileCache subCache = getOrAddFileSubCache(fileKey);
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
+ FileCache.getOrAddFileSubCache(cache, fileKey, CACHE_CTOR);
try {
for (int i = 0; i < ranges.length; ++i) {
LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -247,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
buffer.declaredCachedLength = ranges[i].getLength();
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
- LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+ LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
if (oldVal == null) {
// Cached successfully, add to policy.
cachePolicy.cache(buffer, priority);
@@ -287,7 +300,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
break;
}
// We found some old value but couldn't incRef it; remove it.
- subCache.cache.remove(offset, oldVal);
+ subCache.getCache().remove(offset, oldVal);
}
}
} finally {
@@ -296,38 +309,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
return result;
}
- /**
- * All this mess is necessary because we want to be able to remove sub-caches for fully
- * evicted files. It may actually be better to have non-nested map with object keys?
- */
- private FileCache getOrAddFileSubCache(Object fileKey) {
- FileCache newSubCache = null;
- while (true) { // Overwhelmingly executes once.
- FileCache subCache = cache.get(fileKey);
- if (subCache != null) {
- if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
- if (newSubCache == null) {
- newSubCache = new FileCache();
- newSubCache.incRef();
- }
- // Found a stale value we cannot incRef; try to replace it with new value.
- if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
- continue; // Someone else replaced/removed a stale value, try again.
- }
- // No value found.
- if (newSubCache == null) {
- newSubCache = new FileCache();
- newSubCache.incRef();
- }
- FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
- if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
- if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
- // Someone created one in parallel and then it went stale.
- if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
- // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
- }
- }
-
private static int align64(int number) {
return ((number + 63) & ~63);
}
@@ -370,134 +351,44 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public final void notifyEvicted(MemoryBuffer buffer) {
- allocator.deallocateEvicted(buffer);
newEvictions.incrementAndGet();
}
- private static class FileCache {
- private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
- // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
- // In fact, CSLM has slow single-threaded operation, and one file is probably often read
- // by just one (or few) threads, so a much more simple DS with locking might be better.
- // Let's use CSLM for now, since it's available.
- private final ConcurrentSkipListMap<Long, LlapDataBuffer> cache
- = new ConcurrentSkipListMap<Long, LlapDataBuffer>();
- private final AtomicInteger refCount = new AtomicInteger(0);
-
- boolean incRef() {
- while (true) {
- int value = refCount.get();
- if (value == EVICTED_REFCOUNT) return false;
- if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
- assert value >= 0;
- if (refCount.compareAndSet(value, value + 1)) return true;
- }
- }
-
- void decRef() {
- int value = refCount.decrementAndGet();
- if (value < 0) {
- throw new AssertionError("Unexpected refCount " + value);
- }
- }
-
- boolean startEvicting() {
- while (true) {
- int value = refCount.get();
- if (value != 1) return false;
- if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
- }
- }
-
- void commitEvicting() {
- boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
- assert result;
- }
-
- void abortEvicting() {
- boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
- assert result;
- }
- }
-
- private final class CleanupThread extends Thread {
- private final long approxCleanupIntervalSec;
+ private static final class CleanupThread
+ extends FileCacheCleanupThread<ConcurrentSkipListMap<Long, LlapDataBuffer>> {
- public CleanupThread(long cleanupInterval) {
- super("Llap low level cache cleanup thread");
- this.approxCleanupIntervalSec = cleanupInterval;
- setDaemon(true);
- setPriority(1);
+ public CleanupThread(ConcurrentHashMap<Object,
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super("Llap low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
}
@Override
- public void run() {
- while (true) {
- try {
- doOneCleanupRound();
- } catch (InterruptedException ex) {
- LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
- Thread.currentThread().interrupt();
- break;
- } catch (Throwable t) {
- LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
- break;
- }
- }
+ protected int getCacheSize( FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc) {
+ return fc.getCache().size();
}
- private void doOneCleanupRound() throws InterruptedException {
- while (true) {
- int evictionsSinceLast = newEvictions.getAndSet(0);
- if (evictionsSinceLast > 0) break;
- synchronized (newEvictions) {
- newEvictions.wait(10000);
- }
- }
- // Duration is an estimate; if the size of the map changes, it can be very different.
- long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
- int leftToCheck = 0; // approximate
- for (FileCache fc : cache.values()) {
- leftToCheck += fc.cache.size();
- }
- // Iterate thru all the filecaches. This is best-effort.
- // If these super-long-lived iterators affect the map in some bad way,
- // we'd need to sleep once per round instead.
- Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
- boolean isPastEndTime = false;
- while (iter.hasNext()) {
- FileCache fc = iter.next().getValue();
- if (!fc.incRef()) {
- throw new AssertionError("Something other than cleanup is removing elements from map");
- }
- // Iterate thru the file cache. This is best-effort.
- Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.cache.entrySet().iterator();
- boolean isEmpty = true;
- while (subIter.hasNext()) {
- long time = -1;
- isPastEndTime = isPastEndTime || ((time = System.nanoTime()) >= endTime);
- Thread.sleep(((leftToCheck <= 0) || isPastEndTime)
- ? 1 : (endTime - time) / (1000000L * leftToCheck));
- if (subIter.next().getValue().isInvalid()) {
- subIter.remove();
- } else {
- isEmpty = false;
- }
- --leftToCheck;
- }
- if (!isEmpty) {
- fc.decRef();
- continue;
- }
- // FileCache might be empty; see if we can remove it. "tryWriteLock"
- if (!fc.startEvicting()) continue;
- if (fc.cache.isEmpty()) {
- fc.commitEvicting();
- iter.remove();
+ @Override
+ public int cleanUpOneFileCache(
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc,
+ int leftToCheck, long endTime, Ref<Boolean> isPastEndTime)
+ throws InterruptedException {
+ // Iterate thru the file cache. This is best-effort.
+ Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
+ boolean isEmpty = true;
+ while (subIter.hasNext()) {
+ long time = -1;
+ isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
+ Thread.sleep(((leftToCheck <= 0) || isPastEndTime.value)
+ ? 1 : (endTime - time) / (1000000L * leftToCheck));
+ if (subIter.next().getValue().isInvalid()) {
+ subIter.remove();
} else {
- fc.abortEvicting();
+ isEmpty = false;
}
+ --leftToCheck;
}
+ return leftToCheck;
}
}
@@ -516,11 +407,12 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public String debugDumpForOom() {
StringBuilder sb = new StringBuilder("File cache state ");
- for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
+ for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
+ cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
sb.append("\n file " + e.getKey());
- for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+ for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
if (e2.getValue().incRef() < 0) continue;
try {
sb.append("\n [").append(e2.getKey()).append(", ")
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
new file mode 100644
index 0000000..53e3275
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+
+import com.google.common.base.Function;
+
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
+ private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+ private final Allocator allocator;
+ private final AtomicInteger newEvictions = new AtomicInteger(0);
+ private Thread cleanupThread = null;
+ private final ConcurrentHashMap<Object, FileCache<FileData>> cache = new ConcurrentHashMap<>();
+ private final LowLevelCachePolicy cachePolicy;
+ private final long cleanupInterval;
+ private final LlapDaemonCacheMetrics metrics;
+
+ private static final class StripeInfoComparator implements
+ Comparator<StripeData> {
+ @Override
+ public int compare(StripeData o1, StripeData o2) {
+ int starts = Long.compare(o1.knownTornStart, o2.knownTornStart);
+ if (starts != 0) return starts;
+ starts = Long.compare(o1.firstStart, o2.firstStart);
+ if (starts != 0) return starts;
+ assert (o1.lastStart == o2.lastStart) == (o1.lastEnd == o2.lastEnd);
+ return Long.compare(o1.lastStart, o2.lastStart);
+ }
+ }
+
+ public static class FileData {
+ /**
+ * RW lock ensures we have a consistent view of the file data, which is important given that
+ * we generate "stripe" boundaries arbitrarily. Reading buffer data itself doesn't require
+ * that this lock is held; however, everything else in stripes list does.
+ * TODO: make more granular? We only care that each one reader sees consistent boundaries.
+ * So, we could shallow-copy the stripes list, then have individual locks inside each.
+ */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Object fileKey;
+ private final int colCount;
+ private ArrayList<StripeData> stripes;
+
+ public FileData(Object fileKey, int colCount) {
+ this.fileKey = fileKey;
+ this.colCount = colCount;
+ }
+
+ public void toString(StringBuilder sb) {
+ sb.append("File data for ").append(fileKey).append(" with ").append(colCount)
+ .append(" columns: ").append(stripes);
+ }
+
+ public int getColCount() {
+ return colCount;
+ }
+
+ public ArrayList<StripeData> getData() {
+ return stripes;
+ }
+
+ public void addStripe(StripeData sd) {
+ if (stripes == null) {
+ stripes = new ArrayList<>();
+ }
+ stripes.add(sd);
+ }
+
+ @Override
+ public String toString() {
+ return "[fileKey=" + fileKey + ", colCount=" + colCount + ", stripes=" + stripes + "]";
+ }
+ }
+
+ public static final class StripeData {
+ // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
+ // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+ // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
+ // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
+ // wrt first row when caching - we may produce incorrect result if we adjust the split
+ // boundary, and also if we don't adjust it, depending where it falls. At best, we'd end up
+ // with spurious disk reads if we cache on row boundaries but splits include torn rows.
+ // This structure implies that when reading a split, we skip the first torn row but fully
+ // read the last torn row (as LineRecordReader does). If we want to support a different scheme,
+ // we'd need to store more offsets and make logic account for that.
+ private long knownTornStart; // This can change based on new splits.
+ private final long firstStart, lastStart, lastEnd;
+ // TODO: we can actually consider storing ALL the delta encoded row offsets - not a lot of
+ // overhead compared to the data itself, and with row offsets, we could use columnar
+ // blocks for inconsistent splits. We are not optimizing for inconsistent splits for now.
+
+ private final long rowCount;
+ private final OrcProto.ColumnEncoding[] encodings;
+ private LlapDataBuffer[][][] data; // column index, stream type, buffers
+
+ public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
+ long rowCount, ColumnEncoding[] encodings) {
+ this.knownTornStart = knownTornStart;
+ this.firstStart = firstStart;
+ this.lastStart = lastStart;
+ this.lastEnd = lastEnd;
+ this.encodings = encodings;
+ this.rowCount = rowCount;
+ this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+ }
+
+ @Override
+ public String toString() {
+ return toCoordinateString() + " with encodings [" + Arrays.toString(encodings)
+ .replace('\n', ' ') + "] and data " + SerDeLowLevelCacheImpl.toString(data);
+ }
+
+ public long getKnownTornStart() {
+ return knownTornStart;
+ }
+
+ public long getFirstStart() {
+ return firstStart;
+ }
+
+ public long getLastStart() {
+ return lastStart;
+ }
+
+ public long getLastEnd() {
+ return lastEnd;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public OrcProto.ColumnEncoding[] getEncodings() {
+ return encodings;
+ }
+
+ public LlapDataBuffer[][][] getData() {
+ return data;
+ }
+
+ public String toCoordinateString() {
+ return "stripe kts " + knownTornStart + " from "
+ + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
+ }
+
+ public static StripeData duplicateForResults(StripeData s) {
+ return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
+ s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
+ }
+
+ public void setKnownTornStart(long value) {
+ knownTornStart = value;
+ }
+ }
+
+ public static String toString(LlapDataBuffer[][][] data) {
+ if (data == null) return "null";
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < data.length; ++i) {
+ LlapDataBuffer[][] colData = data[i];
+ if (colData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer s = streamData[k];
+ sb.append(LlapDataBuffer.toDataString(s));
+ }
+ sb.append("], ");
+ }
+ sb.append("], ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+
+ public static String toString(LlapDataBuffer[][] data) {
+ if (data == null) return "null";
+ StringBuilder sb = new StringBuilder("[");
+ for (int j = 0; j < data.length; ++j) {
+ LlapDataBuffer[] streamData = data[j];
+ if (streamData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer s = streamData[k];
+ sb.append(LlapDataBuffer.toDataString(s));
+ }
+ sb.append("], ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ public SerDeLowLevelCacheImpl(
+ LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, Allocator allocator) {
+ this.cachePolicy = cachePolicy;
+ this.allocator = allocator;
+ this.cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
+ this.metrics = metrics;
+ LlapIoImpl.LOG.info("SerDe low-level level cache; cleanup interval {} sec", cleanupInterval);
+ }
+
+ public void startThreads() {
+ if (cleanupInterval < 0) return;
+ cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
+ cleanupThread.start();
+ }
+
+ public FileData getFileData(Object fileKey, long start, long end, boolean[] includes,
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData)
+ throws IOException {
+ FileCache<FileData> subCache = cache.get(fileKey);
+ if (subCache == null || !subCache.incRef()) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find cache for " + fileKey + " in " + cache);
+ markAllAsMissed(start, end, qfCounters, gotAllData);
+ return null;
+ }
+
+ try {
+ FileData cached = subCache.getCache();
+ cached.rwLock.readLock().lock();
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# cache for " + fileKey + " is " + subCache.getCache()).replace('\n', ' '));
+ try {
+ if (cached.stripes == null) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find any stripes for " + fileKey);
+ markAllAsMissed(start, end, qfCounters, gotAllData);
+ return null;
+ }
+ if (includes.length > cached.colCount) {
+ throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
+ + cached.colCount + " columns");
+ }
+ FileData result = new FileData(cached.fileKey, cached.colCount);
+ if (gotAllData != null) {
+ gotAllData.value = true;
+ }
+ // We will adjust start and end so that we could record the metrics; save the originals.
+ long origStart = start, origEnd = end;
+ // startIx is inclusive, endIx is exclusive.
+ int startIx = Integer.MIN_VALUE, endIx = Integer.MIN_VALUE;
+ LlapIoImpl.CACHE_LOGGER.info("TODO# Looking for data between " + start + " and " + end);
+ for (int i = 0; i < cached.stripes.size() && endIx == Integer.MIN_VALUE; ++i) {
+ StripeData si = cached.stripes.get(i);
+ LlapIoImpl.CACHE_LOGGER.info("TODO# looking at " + si.toCoordinateString());
+
+ if (startIx == i) {
+ // The start of the split was in the middle of the previous slice.
+ start = si.knownTornStart;
+ } else if (startIx == Integer.MIN_VALUE) {
+ // Determine if we need to read this slice for the split.
+ if (si.lastEnd <= start) continue; // Slice before the start of the split.
+ // Start of the split falls somewhere within or before this slice.
+ // Note the ">=" - LineRecordReader will skip the first row even if we start
+ // directly at its start, because it cannot know if it's the start or not.
+ // Unless it's 0; note that we DO give 0 special treatment here, unlike the EOF below,
+ // because zero is zero. Need to mention it in Javadoc.
+ if (start == 0 && si.firstStart == 0) {
+ startIx = i;
+ } else if (start >= si.firstStart) {
+ // If the start of the split points into the middle of the cached slice, we cannot
+ // use the cached block - it's encoded and columnar, so we cannot map the file
+ // offset to some "offset" in "middle" of the slice (but see TODO for firstStart).
+ startIx = i + 1;
+ // continue;
+ } else {
+ // Start of the split is before this slice.
+ startIx = i; // Simple case - we will read cache from the split start offset.
+ start = si.knownTornStart;
+ }
+ }
+
+ // Determine if this (or previous) is the last slice we need to read for this split.
+ if (startIx != Integer.MIN_VALUE && endIx == Integer.MIN_VALUE) {
+ if (si.lastEnd <= end) {
+ // The entire current slice is part of the split. Note that if split end EQUALS
+ // lastEnd, the split would also read the next row, so we do need to look at the
+ // next slice, if any (although we'd probably find we cannot use it).
+ // Note also that we DO NOT treat end-of-file differently here, cause we do not know
+ // of any such thing. The caller must handle lastEnd vs end of split vs end of file
+ // match correctly in terms of how LRR handles them. See above for start-of-file.
+ if (i + 1 != cached.stripes.size()) continue;
+ endIx = i + 1;
+ end = si.lastEnd;
+ } else if (si.lastStart <= end) {
+ // The split ends within (and would read) the last row of this slice. Exact match.
+ endIx = i + 1;
+ end = si.lastEnd;
+ } else {
+ // Either the slice comes entirely after the end of split (following a gap in cached
+ // data); or the split ends in the middle of the slice, so it's the same as in the
+ // startIx logic w.r.t. the partial match; so, we either don't want to, or cannot,
+ // use this. There's no need to distinguish these two cases for now.
+ endIx = i;
+ end = (endIx > 0) ? cached.stripes.get(endIx - 1).lastEnd : start;
+ }
+ }
+ }
+ LlapIoImpl.CACHE_LOGGER.info("TODO# determined stripe indexes " + startIx + ", " + endIx);
+ if (endIx <= startIx) {
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ return null; // No data for the split, or it fits in the middle of one or two slices.
+ }
+ if (start > origStart || end < origEnd) {
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ long totalMiss = Math.max(0, origEnd - end) + Math.max(0, start - origStart);
+ metrics.incrCacheRequestedBytes(totalMiss);
+ if (qfCounters != null) {
+ qfCounters.recordCacheMiss(totalMiss);
+ }
+ }
+
+ result.stripes = new ArrayList<>(endIx - startIx);
+ for (int stripeIx = startIx; stripeIx < endIx; ++stripeIx) {
+ getCacheDataForOneSlice(stripeIx, cached, result, gotAllData, includes, qfCounters);
+ }
+ return result;
+ } finally {
+ cached.rwLock.readLock().unlock();
+ }
+ } finally {
+ subCache.decRef();
+ }
+ }
+
+
+ private void getCacheDataForOneSlice(int stripeIx, FileData cached, FileData result,
+ BooleanRef gotAllData, boolean[] includes, LowLevelCacheCounters qfCounters) {
+ StripeData cStripe = cached.stripes.get(stripeIx);
+ LlapIoImpl.CACHE_LOGGER.info("TODO# got stripe in cache " + cStripe);
+ StripeData stripe = StripeData.duplicateForResults(cStripe);
+ result.stripes.add(stripe);
+ boolean isMissed = false;
+ for (int colIx = 0; colIx < cached.colCount; ++colIx) {
+ if (!includes[colIx]) continue;
+ if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
+ assert cStripe.data[colIx] == null; // No encoding => must have no data.
+ isMissed = true;
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ continue;
+ }
+ stripe.encodings[colIx] = cStripe.encodings[colIx];
+ LlapDataBuffer[][] cColData = cStripe.data[colIx];
+ assert cColData != null;
+ for (int streamIx = 0;
+ cColData != null && streamIx < cColData.length; ++streamIx) {
+ LlapDataBuffer[] streamData = cColData[streamIx];
+ // Note: this relies on the fact that we always evict the entire column, so if
+ // we have the column data, we assume we have all the streams we need.
+ if (streamData == null) continue;
+ for (int i = 0; i < streamData.length; ++i) { // Finally, we are going to use "i"!
+ if (!lockBuffer(streamData[i], true)) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# couldn't lock data for stripe at "
+ + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
+
+ cColData = null;
+ isMissed = true;
+ handleRemovedColumnData(cColData);
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ break;
+ }
+ }
+ }
+ // At this point, we have arrived at the level where we need all the data, and the
+ // arrays never change. So we will just do a shallow assignment here instead of copy.
+ stripe.data[colIx] = cColData;
+ }
+ doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
+ }
+
+
+ private void doMetricsStuffForOneSlice(
+ LowLevelCacheCounters qfCounters, StripeData stripe, boolean isMissed) {
+ // Slice boundaries may not match split boundaries due to torn rows in either direction,
+ // so this counter may not be consistent with splits. This is also why we increment
+ // requested bytes here, instead of based on the split - we don't want the metrics to be
+ // inconsistent with each other. No matter what we determine here, at least we'll account
+ // for both in the same manner.
+ long bytes = stripe.lastEnd - stripe.knownTornStart;
+ metrics.incrCacheRequestedBytes(bytes);
+ if (!isMissed) {
+ metrics.incrCacheHitBytes(bytes);
+ }
+ if (qfCounters != null) {
+ if (isMissed) {
+ qfCounters.recordCacheMiss(bytes);
+ } else {
+ qfCounters.recordCacheHit(bytes);
+ }
+ }
+ }
+
+ private void markAllAsMissed(long from, long to,
+ LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+ if (qfCounters != null) {
+ metrics.incrCacheRequestedBytes(to - from);
+ qfCounters.recordCacheMiss(to - from);
+ }
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ }
+
+ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+ int rc = buffer.incRef();
+ if (rc > 0) {
+ metrics.incrCacheNumLockedBuffers();
+ }
+ if (doNotifyPolicy && rc == 1) {
+ // We have just locked a buffer that wasn't previously locked.
+ cachePolicy.notifyLock(buffer);
+ }
+ return rc > 0;
+ }
+
+ public void putFileData(final FileData data, Priority priority,
+ LowLevelCacheCounters qfCounters) {
+ // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
+ // overhead from the java objects to memory manager and remove it when discarding file.
+ if (data.stripes == null || data.stripes.isEmpty()) {
+ LlapIoImpl.LOG.warn("Trying to cache FileData with no data for " + data.fileKey);
+ return;
+ }
+ FileCache<FileData> subCache = null;
+ FileData cached = null;
+ data.rwLock.writeLock().lock();
+ try {
+ subCache = FileCache.getOrAddFileSubCache(
+ cache, data.fileKey, new Function<Void, FileData>() {
+ @Override
+ public FileData apply(Void input) {
+ return data; // If we don't have a file cache, we will add this one as is.
+ }
+ });
+ cached = subCache.getCache();
+ } finally {
+ if (data != cached) {
+ data.rwLock.writeLock().unlock();
+ }
+ }
+ try {
+ if (data != cached) {
+ cached.rwLock.writeLock().lock();
+ }
+ try {
+ for (StripeData si : data.stripes) {
+ lockAllBuffersForPut(si, priority);
+ }
+ if (data == cached) {
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# cached new data " + data).replace('\n', ' '));
+ return;
+ }
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# merging old " + cached + " and new " + data).replace('\n', ' '));
+ ArrayList<StripeData> combined = new ArrayList<>(
+ cached.stripes.size() + data.stripes.size());
+ combined.addAll(cached.stripes);
+ combined.addAll(data.stripes);
+ Collections.sort(combined, new StripeInfoComparator());
+ int lastIx = combined.size() - 1;
+ for (int ix = 0; ix < lastIx; ++ix) {
+ StripeData cur = combined.get(ix), next = combined.get(ix + 1);
+ if (cur.lastEnd <= next.firstStart) continue; // All good.
+ if (cur.firstStart == next.firstStart && cur.lastEnd == next.lastEnd) {
+ mergeStripeInfos(cur, next);
+ combined.remove(ix + 1);
+ --lastIx;
+ // Don't recheck with next, only 2 lists each w/o collisions.
+ continue;
+ }
+ // The original lists do not contain collisions, so only one is 'old'.
+ boolean isCurOriginal = cached.stripes.contains(cur);
+ handleRemovedStripeInfo(combined.remove(isCurOriginal ? ix : ix + 1));
+ --ix;
+ --lastIx;
+ }
+ cached.stripes = combined;
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# new cache data is " + combined).replace('\n', ' '));
+
+ } finally {
+ cached.rwLock.writeLock().unlock();
+ }
+ } finally {
+ subCache.decRef();
+ }
+ }
+
+ private void lockAllBuffersForPut(StripeData si, Priority priority) {
+ for (int i = 0; i < si.data.length; ++i) {
+ LlapDataBuffer[][] colData = si.data[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
+ assert canLock;
+ /*LlapIoImpl.LOG.info("TODO# Calling cache on "
+ + System.identityHashCode(streamData[k]) + ": " + i + ", " + j + ", " + k);*/
+ cachePolicy.cache(streamData[k], priority);
+ streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+ }
+ }
+ }
+ }
+
+ private void handleRemovedStripeInfo(StripeData removed) {
+ for (LlapDataBuffer[][] colData : removed.data) {
+ handleRemovedColumnData(colData);
+ }
+ }
+
+ private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+ // TODO: could we tell the policy that we don't care about these and have them evicted? or we
+ // could just deallocate them when unlocked, and free memory + handle that in eviction.
+ // For now, just abandon the blocks - eventually, they'll get evicted.
+ }
+
+ private void mergeStripeInfos(StripeData to, StripeData from) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# merging slices data: old " + to + " and new " + from);
+ to.knownTornStart = Math.min(to.knownTornStart, from.knownTornStart);
+ if (from.encodings.length != to.encodings.length) {
+ throw new RuntimeException("Different encodings " + from + "; " + to);
+ }
+ for (int colIx = 0; colIx < from.encodings.length; ++colIx) {
+ if (to.encodings[colIx] == null) {
+ to.encodings[colIx] = from.encodings[colIx];
+ } else if (from.encodings[colIx] != null
+ && !to.encodings[colIx].equals(from.encodings[colIx])) {
+ throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
+ }
+ LlapDataBuffer[][] fromColData = from.data[colIx];
+ if (fromColData != null) {
+ if (to.data[colIx] != null) {
+ // Note: we assume here that the data that was returned to the caller from cache will not
+ // be passed back in via put. Right now it's safe since we don't do anything. But if we
+ // evict proactively, we will have to compare objects all the way down.
+ handleRemovedColumnData(to.data[colIx]);
+ }
+ to.data[colIx] = fromColData;
+ }
+ }
+ }
+
+ @Override
+ public void decRefBuffer(MemoryBuffer buffer) {
+ unlockBuffer((LlapDataBuffer)buffer, true);
+ }
+
+ @Override
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
+ unlockBuffer((LlapDataBuffer)b, true);
+ }
+ }
+
+ private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+ boolean isLastDecref = (buffer.decRef() == 0);
+ if (handleLastDecRef && isLastDecref) {
+ // This is kind of not pretty, but this is how we detect whether buffer was cached.
+ // We would always set this for lookups at put time.
+ if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+ cachePolicy.notifyUnlock(buffer);
+ } else {
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
+ }
+ allocator.deallocate(buffer);
+ }
+ }
+ metrics.decrCacheNumLockedBuffers();
+ }
+
+ private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
+ public static LlapDataBuffer allocateFake() {
+ LlapDataBuffer fake = new LlapDataBuffer();
+ fake.initialize(-1, fakeBuf, 0, 1);
+ return fake;
+ }
+
+ public final void notifyEvicted(MemoryBuffer buffer) {
+ newEvictions.incrementAndGet();
+ }
+
+ private final class CleanupThread extends FileCacheCleanupThread<FileData> {
+
+ public CleanupThread(ConcurrentHashMap<Object, FileCache<FileData>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super("Llap serde low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
+ }
+
+ @Override
+ protected int getCacheSize(FileCache<FileData> fc) {
+ return 1; // Each iteration cleans the file cache as a single unit (unlike the ORC cache).
+ }
+
+ @Override
+ public int cleanUpOneFileCache(FileCache<FileData> fc, int leftToCheck, long endTime,
+ Ref<Boolean> isPastEndTime) throws InterruptedException {
+ FileData fd = fc.getCache();
+ fd.rwLock.writeLock().lock();
+ try {
+ for (StripeData sd : fd.stripes) {
+ for (int colIx = 0; colIx < sd.data.length; ++colIx) {
+ LlapDataBuffer[][] colData = sd.data[colIx];
+ if (colData == null) continue;
+ boolean hasAllData = true;
+ for (int j = 0; (j < colData.length) && hasAllData; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer buf = streamData[k];
+ hasAllData = hasAllData && lockBuffer(buf, false);
+ if (!hasAllData) break;
+ unlockBuffer(buf, true);
+ }
+ }
+ if (!hasAllData) {
+ handleRemovedColumnData(colData);
+ sd.data[colIx] = null;
+ }
+ }
+ }
+ } finally {
+ fd.rwLock.writeLock().unlock();
+ }
+ return leftToCheck - 1;
+ }
+ }
+
+ @Override
+ public boolean incRefBuffer(MemoryBuffer buffer) {
+ // notifyReused implies that buffer is already locked; it's also called once for new
+ // buffers that are not cached yet. Don't notify cache policy.
+ return lockBuffer(((LlapDataBuffer)buffer), false);
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return allocator;
+ }
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("File cache state ");
+ for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
+ if (!e.getValue().incRef()) continue;
+ try {
+ sb.append("\n file " + e.getKey());
+ sb.append("\n [");
+ e.getValue().getCache().toString(sb);
+ sb.append("]");
+ } finally {
+ e.getValue().decRef();
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 290624d..ac9c1da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,21 +27,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -53,15 +46,13 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
@@ -89,65 +80,77 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
@SuppressWarnings("rawtypes")
private final InputFormat sourceInputFormat;
private final AvoidSplitCombination sourceASC;
- private final ColumnVectorProducer cvp;
- private final ExecutorService executor;
+ @SuppressWarnings("deprecation")
+ private final Deserializer sourceSerDe;
+ final ColumnVectorProducer cvp;
+ final ExecutorService executor;
private final String hostName;
@SuppressWarnings("rawtypes")
- LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
- ExecutorService executor) {
- // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
- // We'd need to plumb it thru and use it to get data to cache/etc.
- assert sourceInputFormat instanceof OrcInputFormat;
+ LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
+ ColumnVectorProducer cvp, ExecutorService executor) {
this.executor = executor;
this.cvp = cvp;
this.sourceInputFormat = sourceInputFormat;
this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
? (AvoidSplitCombination)sourceInputFormat : null;
+ this.sourceSerDe = sourceSerDe;
this.hostName = HiveStringUtils.getHostname();
}
@Override
public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
- boolean useLlapIo = true;
- if (split instanceof LlapAwareSplit) {
- useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
- }
+ RecordReader<NullWritable, VectorizedRowBatch> noLlap = checkLlapSplit(split, job, reporter);
+ if (noLlap != null) return noLlap;
+
boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
- if (!useLlapIo) {
- LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
- return sourceInputFormat.getRecordReader(split, job, reporter);
- }
FileSplit fileSplit = (FileSplit) split;
reporter.setStatus(fileSplit.toString());
try {
List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
? null : ColumnProjectionUtils.getReadColumnIDs(job);
- LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName);
-
+ RecordReader<?, ?> sourceRr = null;
+ LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName, cvp,
+ executor, sourceInputFormat, sourceSerDe, reporter);
if (!rr.init()) {
return sourceInputFormat.getRecordReader(split, job, reporter);
}
- // vectorized row batch reader
- if (isVectorized) {
- return rr;
- }
-
- // row batch to row-by-row reader
- if (sourceInputFormat instanceof BatchToRowInputFormat) {
- return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
- rr, rr.getVectorizedRowBatchCtx(), includedCols));
- }
-
- return sourceInputFormat.getRecordReader(split, job, reporter);
+ return wrapLlapReader(isVectorized, includedCols, rr, split, job, reporter);
} catch (Exception ex) {
throw new IOException(ex);
}
}
+ public RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
+ boolean isVectorized, List<Integer> includedCols, LlapRecordReader rr,
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ // vectorized row batch reader
+ if (isVectorized) {
+ return rr;
+ } else if (sourceInputFormat instanceof BatchToRowInputFormat) {
+ return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
+ rr, rr.getVectorizedRowBatchCtx(), includedCols));
+ } else {
+ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+ return sourceInputFormat.getRecordReader(split, job, reporter);
+ }
+ }
+
+ public RecordReader<NullWritable, VectorizedRowBatch> checkLlapSplit(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ boolean useLlapIo = true;
+ if (split instanceof LlapAwareSplit) {
+ useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
+ }
+ if (useLlapIo) return null;
+
+ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+ return sourceInputFormat.getRecordReader(split, job, reporter);
+ }
+
// Returning either a vectorized or non-vectorized reader from the same call requires breaking
// generics... this is how vectorization currently works.
@SuppressWarnings("unchecked")
@@ -160,276 +163,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
return sourceInputFormat.getSplits(job, numSplits);
}
- private class LlapRecordReader
- implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
- private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
- private final FileSplit split;
- private final List<Integer> columnIds;
- private final SearchArgument sarg;
- private final String[] columnNames;
- private final VectorizedRowBatchCtx rbCtx;
- private final Object[] partitionValues;
-
- private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
- private ColumnVectorBatch lastCvb = null;
- private boolean isFirst = true;
-
- private Throwable pendingError = null;
- /** Vector that is currently being processed by our user. */
- private boolean isDone = false;
- private final boolean isClosed = false;
- private final ConsumerFeedback<ColumnVectorBatch> feedback;
- private final QueryFragmentCounters counters;
- private long firstReturnTime;
-
- private final JobConf jobConf;
- private final TypeDescription fileSchema;
- private final boolean[] includedColumns;
- private final ReadPipeline rp;
-
- public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
- String hostName) throws IOException, HiveException {
- this.jobConf = job;
- this.split = split;
- this.columnIds = includedCols;
- this.sarg = ConvertAstToSearchArg.createFromConf(job);
- this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
- final String fragmentId = LlapTezUtils.getFragmentId(job);
- final String dagId = LlapTezUtils.getDagId(job);
- final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
- MDC.put("dagId", dagId);
- MDC.put("queryId", queryId);
- TezCounters taskCounters = null;
- if (fragmentId != null) {
- MDC.put("fragmentId", fragmentId);
- taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
- LOG.info("Received fragment id: {}", fragmentId);
- } else {
- LOG.warn("Not using tez counters as fragment id string is null");
- }
- this.counters = new QueryFragmentCounters(job, taskCounters);
- this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
-
- MapWork mapWork = Utilities.getMapWork(job);
- VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
- rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
-
- int partitionColumnCount = rbCtx.getPartitionColumnCount();
- if (partitionColumnCount > 0) {
- partitionValues = new Object[partitionColumnCount];
- VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
- } else {
- partitionValues = null;
- }
- boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
- TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(job, isAcidScan, Integer.MAX_VALUE);
-
- // Create the consumer of encoded data; it will coordinate decoding to CVBs.
- rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema);
- feedback = rp;
- fileSchema = rp.getFileSchema();
- includedColumns = rp.getIncludedColumns();
- }
-
- /**
- * Starts the data read pipeline
- */
- public boolean init() {
- SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema,
- rp.getReaderSchema(), includedColumns);
- for (Integer colId : columnIds) {
- if (!schemaEvolution.isPPDSafeConversion(colId)) {
- LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
- return false;
- }
- }
-
- // perform the data read asynchronously
- if (executor instanceof StatsRecordingThreadPool) {
- // Every thread created by this thread pool will use the same handler
- ((StatsRecordingThreadPool) executor)
- .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
- }
- executor.submit(rp.getReadCallable());
- return true;
- }
-
- @Override
- public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
- assert value != null;
- if (isClosed) {
- throw new AssertionError("next called after close");
- }
- // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
- boolean wasFirst = isFirst;
- if (isFirst) {
- if (partitionValues != null) {
- rbCtx.addPartitionColsToBatch(value, partitionValues);
- }
- isFirst = false;
- }
- ColumnVectorBatch cvb = null;
- try {
- cvb = nextCvb();
- } catch (InterruptedException e) {
- // Query might have been canceled. Stop the background processing.
- feedback.stop();
- throw new IOException(e);
- }
- if (cvb == null) {
- if (wasFirst) {
- firstReturnTime = counters.startTimeCounter();
- }
- counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
- return false;
- }
- if (columnIds.size() != cvb.cols.length) {
- throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
- + " included, but the reader returned " + cvb.cols.length);
- }
- // VRB was created from VrbCtx, so we already have pre-allocated column vectors
- for (int i = 0; i < cvb.cols.length; ++i) {
- // Return old CVs (if any) to caller. We assume these things all have the same schema.
- cvb.swapColumnVector(i, value.cols, columnIds.get(i));
- }
- value.selectedInUse = false;
- value.size = cvb.size;
- if (wasFirst) {
- firstReturnTime = counters.startTimeCounter();
- }
- return true;
- }
-
- public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
- return rbCtx;
- }
-
- private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
- " Message: {}", t.getName(), t.getId(), e.getMessage());
- setError(e);
- }
- }
-
- ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
- boolean isFirst = (lastCvb == null);
- if (!isFirst) {
- feedback.returnData(lastCvb);
- }
- synchronized (pendingData) {
- // We are waiting for next block. Either we will get it, or be told we are done.
- boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next will block");
- }
- while (isNothingToReport()) {
- pendingData.wait(100);
- }
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next is unblocked");
- }
- rethrowErrorIfAny();
- lastCvb = pendingData.poll();
- }
- if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
- LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
- }
- return lastCvb;
- }
-
- private boolean isNothingToReport() {
- return !isDone && pendingData.isEmpty() && pendingError == null;
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public VectorizedRowBatch createValue() {
- return rbCtx.createVectorizedRowBatch();
- }
-
- @Override
- public long getPos() throws IOException {
- return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
- }
-
- @Override
- public void close() throws IOException {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
- feedback.stop();
- rethrowErrorIfAny();
- MDC.clear();
- }
-
- private void rethrowErrorIfAny() throws IOException {
- if (pendingError == null) return;
- if (pendingError instanceof IOException) {
- throw (IOException)pendingError;
- }
- throw new IOException(pendingError);
- }
-
- @Override
- public void setDone() {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- isDone = true;
- pendingData.notifyAll();
- }
- }
-
- @Override
- public void consumeData(ColumnVectorBatch data) {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- if (isClosed) {
- return;
- }
- pendingData.add(data);
- pendingData.notifyAll();
- }
- }
-
- @Override
- public void setError(Throwable t) {
- counters.incrCounter(LlapIOCounters.NUM_ERRORS);
- LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- assert t != null;
- synchronized (pendingData) {
- pendingError = t;
- pendingData.notifyAll();
- }
- }
-
- @Override
- public float getProgress() throws IOException {
- // TODO: plumb progress info thru the reader if we can get metadata from loader first.
- return 0.0f;
- }
- }
-
@Override
public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
}
- private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
+ static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
// This is based on Vectorizer code, minus the validation.
// Add all non-virtual columns from the TableScan operator.
@@ -477,5 +216,4 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
}
return tableScanOperator;
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 8048624..7cfd133 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -74,7 +78,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private static final String MODE_CACHE = "cache";
- private final ColumnVectorProducer cvp;
+ // TODO: later, we may have a map
+ private final ColumnVectorProducer orcCvp, genericCvp;
private final ExecutorService executor;
private final LlapDaemonCacheMetrics cacheMetrics;
private final LlapDaemonIOMetrics ioMetrics;
@@ -110,6 +115,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
OrcMetadataCache metadataCache = null;
LowLevelCache cache = null;
+ SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
BufferUsageManager bufferManager = null;
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -124,11 +130,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
this.allocator = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
+ SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
+ cacheMetrics, cachePolicy, allocator);
cache = cacheImpl;
+ serdeCache = serdeCacheImpl;
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+ cachePolicy.setEvictionListener(new EvictionDispatcher(
+ cache, serdeCacheImpl, metadataCache, allocator));
cachePolicy.setParentDebugDumper(cacheImpl);
cacheImpl.startThreads(); // Start the cache threads.
bufferManager = cacheImpl; // Cache also serves as buffer manager.
@@ -145,8 +155,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
// TODO: this should depends on input format and be in a map, or something.
- this.cvp = new OrcColumnVectorProducer(
+ this.orcCvp = new OrcColumnVectorProducer(
metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
+ this.genericCvp = new GenericColumnVectorProducer(
+ serdeCache, bufferManager, conf, cacheMetrics, ioMetrics);
LOG.info("LLAP IO initialized");
registerMXBeans();
@@ -159,8 +171,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
@SuppressWarnings("rawtypes")
@Override
public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
- InputFormat sourceInputFormat) {
- return new LlapInputFormat(sourceInputFormat, cvp, executor);
+ InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+ ColumnVectorProducer cvp = genericCvp;
+ if (sourceInputFormat instanceof OrcInputFormat) {
+ cvp = orcCvp; // Special-case for ORC.
+ }
+ return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
}
@Override