You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:09 UTC

[1/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master-15147 [created] 682a3c7b4


http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
index cef765c..35be661 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StreamUtils.java
@@ -21,11 +21,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hive.common.DiskRangeInfo;
+import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.orc.impl.SettableUncompressedStream;
 import org.apache.orc.impl.BufferChunk;
 
+import com.google.common.collect.Lists;
+
 /**
  * Stream utility.
  */
@@ -45,9 +48,13 @@ public class StreamUtils {
       return null;
     }
 
-    DiskRangeInfo diskRangeInfo = createDiskRangeInfo(streamBuffer);
-    return new SettableUncompressedStream(streamName, diskRangeInfo.getDiskRanges(),
-        diskRangeInfo.getTotalLength());
+    if (streamBuffer.getCacheBuffers() != null) {
+      DiskRangeInfo diskRangeInfo = createDiskRangeInfo(streamBuffer);
+      return new SettableUncompressedStream(streamName, diskRangeInfo.getDiskRanges(),
+          diskRangeInfo.getTotalLength());
+    } else {
+      return new SettableUncompressedStream(streamName, Lists.<DiskRange>newArrayList(), 0);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index 09e4a47..9a6406d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -88,8 +88,9 @@ public class PartialScanMapper extends MapReduceBase implements
     }
 
     try {
-      //CombineHiveInputFormat is set in PartialScanTask.
-      RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+      //CombineHiveInputFormat may be set in PartialScanTask.
+      RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper)
+          ((k instanceof CombineHiveKey) ?  ((CombineHiveKey) k).getKey() : k);
 
       // calculate rawdatasize
       KeyBuffer keyBuffer = key.getKeyBuffer();

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/test/queries/clientpositive/llap_text.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/llap_text.q b/ql/src/test/queries/clientpositive/llap_text.q
new file mode 100644
index 0000000..0ff79e2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/llap_text.q
@@ -0,0 +1,62 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.fetch.task.conversion=none;
+
+SET hive.llap.io.enabled=false;
+set hive.llap.cache.allow.synthetic.fileid=true;
+
+
+DROP TABLE text_llap;
+
+CREATE TABLE text_llap(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat" 
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+
+alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe';
+
+
+insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null;
+
+create table text_llap1 like text_llap; 
+create table text_llap100 like text_llap; 
+create table text_llap1000 like text_llap; 
+
+insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1;
+
+insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100;
+
+insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000;
+
+
+SET hive.llap.io.enabled=true;
+SET hive.vectorized.execution.enabled=true;
+set hive.llap.io.encode.slice.row.count=90;
+
+select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2;
+select * from text_llap100 order by cint, cstring1, cstring2;
+select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2;
+
+
+DROP TABLE text_llap;

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/test/results/clientpositive/llap_text.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap_text.q.out b/ql/src/test/results/clientpositive/llap_text.q.out
new file mode 100644
index 0000000..5660ef1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap_text.q.out
@@ -0,0 +1,502 @@
+PREHOOK: query: DROP TABLE text_llap
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE text_llap
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE text_llap(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat" 
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: CREATE TABLE text_llap(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN)
+row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ stored as inputformat "org.apache.hadoop.hive.llap.io.decode.LlapTextInputFormat" 
+ outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap
+PREHOOK: query: alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+PREHOOK: type: ALTERTABLE_SERIALIZER
+PREHOOK: Input: default@text_llap
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: alter table text_llap set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+POSTHOOK: type: ALTERTABLE_SERIALIZER
+POSTHOOK: Input: default@text_llap
+POSTHOOK: Output: default@text_llap
+PREHOOK: query: insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: insert into table text_llap
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap
+POSTHOOK: Lineage: text_llap.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: create table text_llap1 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap1
+POSTHOOK: query: create table text_llap1 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap1
+PREHOOK: query: create table text_llap100 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap100
+POSTHOOK: query: create table text_llap100 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap100
+PREHOOK: query: create table text_llap1000 like text_llap
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@text_llap1000
+POSTHOOK: query: create table text_llap1000 like text_llap
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@text_llap1000
+PREHOOK: query: insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap1
+POSTHOOK: query: insert into table text_llap1
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap1
+POSTHOOK: Lineage: text_llap1.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap1.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap100
+POSTHOOK: query: insert into table text_llap100
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap100
+POSTHOOK: Lineage: text_llap100.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap100.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap100.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap100.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@text_llap1000
+POSTHOOK: query: insert into table text_llap1000
+select ctinyint, csmallint, cint, cbigint, cfloat, cdouble, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc 
+where cboolean2 is not null or cstring1 is not null or ctinyint is not null limit 1000
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@text_llap1000
+POSTHOOK: Lineage: text_llap1000.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: text_llap1000.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
+PREHOOK: query: select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select ctinyint, cstring1, cboolean2 from text_llap100 order by ctinyint, cstring1, cboolean2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+NULL	cvLH6Eat2yFsyy7p	NULL
+NULL	cvLH6Eat2yFsyy7p	NULL
+-62	cvLH6Eat2yFsyy7p	NULL
+-59	cvLH6Eat2yFsyy7p	NULL
+-57	cvLH6Eat2yFsyy7p	NULL
+-56	cvLH6Eat2yFsyy7p	NULL
+-56	cvLH6Eat2yFsyy7p	NULL
+-55	cvLH6Eat2yFsyy7p	NULL
+-55	cvLH6Eat2yFsyy7p	NULL
+-54	cvLH6Eat2yFsyy7p	NULL
+-53	cvLH6Eat2yFsyy7p	NULL
+-51	cvLH6Eat2yFsyy7p	NULL
+-50	cvLH6Eat2yFsyy7p	NULL
+-48	cvLH6Eat2yFsyy7p	NULL
+-48	cvLH6Eat2yFsyy7p	NULL
+-47	cvLH6Eat2yFsyy7p	NULL
+-45	cvLH6Eat2yFsyy7p	NULL
+-45	cvLH6Eat2yFsyy7p	NULL
+-44	cvLH6Eat2yFsyy7p	NULL
+-43	cvLH6Eat2yFsyy7p	NULL
+-40	cvLH6Eat2yFsyy7p	NULL
+-37	cvLH6Eat2yFsyy7p	NULL
+-36	cvLH6Eat2yFsyy7p	NULL
+-34	cvLH6Eat2yFsyy7p	NULL
+-34	cvLH6Eat2yFsyy7p	NULL
+-33	cvLH6Eat2yFsyy7p	NULL
+-33	cvLH6Eat2yFsyy7p	NULL
+-32	cvLH6Eat2yFsyy7p	NULL
+-30	cvLH6Eat2yFsyy7p	NULL
+-28	cvLH6Eat2yFsyy7p	NULL
+-28	cvLH6Eat2yFsyy7p	NULL
+-24	cvLH6Eat2yFsyy7p	NULL
+-23	cvLH6Eat2yFsyy7p	NULL
+-23	cvLH6Eat2yFsyy7p	NULL
+-23	cvLH6Eat2yFsyy7p	NULL
+-22	cvLH6Eat2yFsyy7p	NULL
+-22	cvLH6Eat2yFsyy7p	NULL
+-22	cvLH6Eat2yFsyy7p	NULL
+-21	cvLH6Eat2yFsyy7p	NULL
+-21	cvLH6Eat2yFsyy7p	NULL
+-19	cvLH6Eat2yFsyy7p	NULL
+-16	cvLH6Eat2yFsyy7p	NULL
+-16	cvLH6Eat2yFsyy7p	NULL
+-13	cvLH6Eat2yFsyy7p	NULL
+-12	cvLH6Eat2yFsyy7p	NULL
+-11	cvLH6Eat2yFsyy7p	NULL
+-11	cvLH6Eat2yFsyy7p	NULL
+-11	cvLH6Eat2yFsyy7p	NULL
+-7	cvLH6Eat2yFsyy7p	NULL
+-5	cvLH6Eat2yFsyy7p	NULL
+-5	cvLH6Eat2yFsyy7p	NULL
+-5	cvLH6Eat2yFsyy7p	NULL
+-4	cvLH6Eat2yFsyy7p	NULL
+-4	cvLH6Eat2yFsyy7p	NULL
+-1	cvLH6Eat2yFsyy7p	NULL
+0	cvLH6Eat2yFsyy7p	NULL
+0	cvLH6Eat2yFsyy7p	NULL
+2	cvLH6Eat2yFsyy7p	NULL
+4	cvLH6Eat2yFsyy7p	NULL
+5	cvLH6Eat2yFsyy7p	NULL
+8	cvLH6Eat2yFsyy7p	NULL
+9	cvLH6Eat2yFsyy7p	NULL
+10	cvLH6Eat2yFsyy7p	NULL
+13	cvLH6Eat2yFsyy7p	NULL
+16	cvLH6Eat2yFsyy7p	NULL
+18	cvLH6Eat2yFsyy7p	NULL
+19	cvLH6Eat2yFsyy7p	NULL
+21	cvLH6Eat2yFsyy7p	NULL
+24	cvLH6Eat2yFsyy7p	NULL
+24	cvLH6Eat2yFsyy7p	NULL
+26	cvLH6Eat2yFsyy7p	NULL
+27	cvLH6Eat2yFsyy7p	NULL
+27	cvLH6Eat2yFsyy7p	NULL
+28	cvLH6Eat2yFsyy7p	NULL
+29	cvLH6Eat2yFsyy7p	NULL
+29	cvLH6Eat2yFsyy7p	NULL
+30	cvLH6Eat2yFsyy7p	NULL
+31	cvLH6Eat2yFsyy7p	NULL
+31	cvLH6Eat2yFsyy7p	NULL
+34	cvLH6Eat2yFsyy7p	NULL
+34	cvLH6Eat2yFsyy7p	NULL
+36	cvLH6Eat2yFsyy7p	NULL
+36	cvLH6Eat2yFsyy7p	NULL
+38	cvLH6Eat2yFsyy7p	NULL
+38	cvLH6Eat2yFsyy7p	NULL
+38	cvLH6Eat2yFsyy7p	NULL
+39	cvLH6Eat2yFsyy7p	NULL
+40	cvLH6Eat2yFsyy7p	NULL
+40	cvLH6Eat2yFsyy7p	NULL
+41	cvLH6Eat2yFsyy7p	NULL
+43	cvLH6Eat2yFsyy7p	NULL
+46	cvLH6Eat2yFsyy7p	NULL
+51	cvLH6Eat2yFsyy7p	NULL
+51	cvLH6Eat2yFsyy7p	NULL
+53	cvLH6Eat2yFsyy7p	NULL
+53	cvLH6Eat2yFsyy7p	NULL
+61	cvLH6Eat2yFsyy7p	NULL
+61	cvLH6Eat2yFsyy7p	NULL
+61	cvLH6Eat2yFsyy7p	NULL
+62	cvLH6Eat2yFsyy7p	NULL
+PREHOOK: query: select * from text_llap100 order by cint, cstring1, cstring2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select * from text_llap100 order by cint, cstring1, cstring2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+-50	-13326	528534767	NULL	-50.0	-13326.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:46.674	1969-12-31 16:00:08.875	true	NULL
+NULL	-4213	528534767	NULL	NULL	-4213.0	cvLH6Eat2yFsyy7p	NULL	NULL	1969-12-31 16:00:13.589	true	NULL
+-28	-15813	528534767	NULL	-28.0	-15813.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.787	1969-12-31 16:00:01.546	true	NULL
+31	-9566	528534767	NULL	31.0	-9566.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.187	1969-12-31 16:00:06.961	true	NULL
+-34	15007	528534767	NULL	-34.0	15007.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:50.434	1969-12-31 16:00:13.352	true	NULL
+29	7021	528534767	NULL	29.0	7021.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:15.007	1969-12-31 16:00:15.148	true	NULL
+31	4963	528534767	NULL	31.0	4963.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:07.021	1969-12-31 16:00:02.997	true	NULL
+27	-7824	528534767	NULL	27.0	-7824.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:04.963	1969-12-31 15:59:56.474	true	NULL
+-11	-15431	528534767	NULL	-11.0	-15431.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.176	1969-12-31 16:00:07.787	true	NULL
+61	-15549	528534767	NULL	61.0	-15549.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.569	1969-12-31 15:59:51.665	true	NULL
+16	5780	528534767	NULL	16.0	5780.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.451	1969-12-31 16:00:12.752	true	NULL
+5	14625	528534767	NULL	5.0	14625.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:05.78	1969-12-31 16:00:15.34	true	NULL
+-23	13026	528534767	NULL	-23.0	13026.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:14.625	1969-12-31 16:00:10.77	true	NULL
+-51	-12083	528534767	NULL	-51.0	-12083.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:13.026	1969-12-31 16:00:02.52	true	NULL
+-11	9472	528534767	NULL	-11.0	9472.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:47.917	1969-12-31 16:00:03.716	true	NULL
+-48	-7735	528534767	NULL	-48.0	-7735.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:09.472	1969-12-31 16:00:00.8	true	NULL
+-62	10	528534767	NULL	-62.0	10.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.265	1969-12-31 15:59:56.584	true	NULL
+-45	5521	528534767	NULL	-45.0	5521.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.01	1969-12-31 15:59:48.553	true	NULL
+40	-1724	528534767	NULL	40.0	-1724.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:05.521	1969-12-31 15:59:57.835	true	NULL
+39	-10909	528534767	NULL	39.0	-10909.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:58.276	1969-12-31 16:00:12.738	true	NULL
+-32	11242	528534767	NULL	-32.0	11242.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:49.091	1969-12-31 15:59:55.681	true	NULL
+-56	8353	528534767	NULL	-56.0	8353.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:11.242	1969-12-31 15:59:46.526	true	NULL
+-7	2541	528534767	NULL	-7.0	2541.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:08.353	1969-12-31 15:59:57.374	true	NULL
+24	4432	528534767	NULL	24.0	4432.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:02.541	1969-12-31 16:00:10.895	true	NULL
+36	-15912	528534767	NULL	36.0	-15912.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:04.432	1969-12-31 16:00:04.376	true	NULL
+-23	-10154	528534767	NULL	-23.0	-10154.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.088	1969-12-31 15:59:56.086	true	NULL
+-55	-7449	528534767	NULL	-55.0	-7449.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:49.846	1969-12-31 15:59:55.75	true	NULL
+-11	7476	528534767	NULL	-11.0	7476.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.551	1969-12-31 15:59:57.567	true	NULL
+51	-4490	528534767	NULL	51.0	-4490.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:07.476	1969-12-31 15:59:49.318	true	NULL
+-24	163	528534767	NULL	-24.0	163.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.51	1969-12-31 16:00:04.014	true	NULL
+-44	-1299	528534767	NULL	-44.0	-1299.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.163	1969-12-31 15:59:47.687	true	NULL
+8	7860	528534767	NULL	8.0	7860.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:58.701	1969-12-31 16:00:01.97	true	NULL
+24	-4812	528534767	NULL	24.0	-4812.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:07.86	1969-12-31 15:59:55	true	NULL
+4	-14739	528534767	NULL	4.0	-14739.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.188	1969-12-31 16:00:15.26	true	NULL
+-57	-11492	528534767	NULL	-57.0	-11492.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:45.261	1969-12-31 16:00:05.306	true	NULL
+-22	3856	528534767	NULL	-22.0	3856.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:48.508	1969-12-31 15:59:54.534	true	NULL
+28	8035	528534767	NULL	28.0	8035.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:03.856	1969-12-31 15:59:55.95	true	NULL
+-16	-7964	528534767	NULL	-16.0	-7964.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:08.035	1969-12-31 16:00:12.464	true	NULL
+46	6958	528534767	NULL	46.0	6958.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.036	1969-12-31 16:00:10.191	true	NULL
+29	-1990	528534767	NULL	29.0	-1990.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:06.958	1969-12-31 15:59:52.902	true	NULL
+-56	8402	528534767	NULL	-56.0	8402.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:58.01	1969-12-31 16:00:05.146	true	NULL
+-16	-6922	528534767	NULL	-16.0	-6922.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:08.402	1969-12-31 15:59:50.561	true	NULL
+38	-6583	528534767	NULL	38.0	-6583.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:53.078	1969-12-31 16:00:06.722	true	NULL
+-54	-10268	528534767	NULL	-54.0	-10268.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:53.417	1969-12-31 16:00:00.687	true	NULL
+-23	4587	528534767	NULL	-23.0	4587.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:49.732	1969-12-31 15:59:48.52	true	NULL
+-19	1206	528534767	NULL	-19.0	1206.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:04.587	1969-12-31 16:00:08.381	true	NULL
+40	-7984	528534767	NULL	40.0	-7984.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:01.206	1969-12-31 16:00:02.59	true	NULL
+62	6557	528534767	NULL	62.0	6557.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.016	1969-12-31 16:00:00.367	true	NULL
+-34	4181	528534767	NULL	-34.0	4181.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:06.557	1969-12-31 16:00:04.869	true	NULL
+53	-10129	528534767	NULL	53.0	-10129.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:04.181	1969-12-31 16:00:08.061	true	NULL
+51	-15790	528534767	NULL	51.0	-15790.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:49.871	1969-12-31 15:59:57.821	true	NULL
+-4	2617	528534767	NULL	-4.0	2617.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.21	1969-12-31 15:59:44.733	true	NULL
+61	12161	528534767	NULL	61.0	12161.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:02.617	1969-12-31 16:00:10.536	true	NULL
+19	7952	528534767	NULL	19.0	7952.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:12.161	1969-12-31 16:00:00.95	true	NULL
+-33	7350	528534767	NULL	-33.0	7350.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:07.952	1969-12-31 15:59:48.183	true	NULL
+53	-12171	528534767	NULL	53.0	-12171.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:07.35	1969-12-31 15:59:57.549	true	NULL
+18	-3045	528534767	NULL	18.0	-3045.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:47.829	1969-12-31 16:00:05.045	true	NULL
+30	-814	528534767	NULL	30.0	-814.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:56.955	1969-12-31 16:00:11.799	true	NULL
+-36	1639	528534767	NULL	-36.0	1639.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.186	1969-12-31 16:00:13.098	true	NULL
+34	-15059	528534767	NULL	34.0	-15059.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:01.639	1969-12-31 16:00:13.206	true	NULL
+-55	-7353	528534767	NULL	-55.0	-7353.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:44.941	1969-12-31 15:59:54.268	true	NULL
+-40	-4463	528534767	NULL	-40.0	-4463.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.647	1969-12-31 15:59:46.254	true	NULL
+21	11737	528534767	NULL	21.0	11737.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.537	1969-12-31 15:59:45.022	true	NULL
+61	-1254	528534767	NULL	61.0	-1254.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:11.737	1969-12-31 16:00:12.004	true	NULL
+-59	10688	528534767	NULL	-59.0	10688.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:58.746	1969-12-31 16:00:15.489	true	NULL
+0	-3166	528534767	NULL	0.0	-3166.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:10.688	1969-12-31 16:00:01.385	true	NULL
+-21	3168	528534767	NULL	-21.0	3168.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:56.834	1969-12-31 16:00:13.331	true	NULL
+-33	14072	528534767	NULL	-33.0	14072.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:03.168	1969-12-31 15:59:55.836	true	NULL
+-30	834	528534767	NULL	-30.0	834.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:14.072	1969-12-31 16:00:03.004	true	NULL
+-5	-13229	528534767	NULL	-5.0	-13229.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.834	1969-12-31 16:00:00.388	true	NULL
+-53	-3419	528534767	NULL	-53.0	-3419.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:46.771	1969-12-31 15:59:53.744	true	NULL
+34	-4255	528534767	NULL	34.0	-4255.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:56.581	1969-12-31 15:59:57.88	true	NULL
+-5	12422	528534767	NULL	-5.0	12422.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.745	1969-12-31 15:59:48.802	true	NULL
+27	-14965	528534767	NULL	27.0	-14965.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:12.422	1969-12-31 16:00:09.517	true	NULL
+NULL	-3012	528534767	NULL	NULL	-3012.0	cvLH6Eat2yFsyy7p	NULL	NULL	1969-12-31 16:00:03.756	true	NULL
+-21	-7183	528534767	NULL	-21.0	-7183.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:45.035	1969-12-31 16:00:06.182	true	NULL
+43	1475	528534767	NULL	43.0	1475.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:56.988	1969-12-31 16:00:03.442	true	NULL
+41	37	528534767	NULL	41.0	37.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:52.817	1969-12-31 15:59:53.672	true	NULL
+-28	6453	528534767	NULL	-28.0	6453.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:01.475	1969-12-31 16:00:07.828	true	NULL
+-5	-14379	528534767	NULL	-5.0	-14379.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.037	1969-12-31 15:59:49.141	true	NULL
+13	1358	528534767	NULL	13.0	1358.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:06.453	1969-12-31 16:00:00.423	true	NULL
+-45	-14072	528534767	NULL	-45.0	-14072.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:45.621	1969-12-31 15:59:45.914	true	NULL
+10	9366	528534767	NULL	10.0	9366.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:01.358	1969-12-31 15:59:50.592	true	NULL
+-22	77	528534767	NULL	-22.0	77.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:45.928	1969-12-31 15:59:43.621	true	NULL
+38	-4667	528534767	NULL	38.0	-4667.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:09.366	1969-12-31 15:59:52.334	true	NULL
+-48	13300	528534767	NULL	-48.0	13300.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:00.077	1969-12-31 15:59:45.827	true	NULL
+2	1345	528534767	NULL	2.0	1345.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:55.333	1969-12-31 16:00:00.517	true	NULL
+-37	-12472	528534767	NULL	-37.0	-12472.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:13.3	1969-12-31 15:59:55.998	true	NULL
+-43	486	528534767	NULL	-43.0	486.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:01.345	1969-12-31 15:59:52.667	true	NULL
+36	14907	528534767	NULL	36.0	14907.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:47.528	1969-12-31 15:59:47.206	true	NULL
+-1	-75	528534767	NULL	-1.389	-863.257	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:49.331	1969-12-31 16:00:07.585	true	NULL
+-12	-2013	528534767	NULL	-12.0	-2013.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:14.907	1969-12-31 15:59:58.789	true	NULL
+0	15626	528534767	NULL	0.0	15626.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:09.566	1969-12-31 16:00:15.217	true	NULL
+26	3961	528534767	NULL	26.0	3961.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:57.987	1969-12-31 15:59:52.232	true	NULL
+-22	8499	528534767	NULL	-22.0	8499.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:15.626	1969-12-31 16:00:10.923	true	NULL
+9	9169	528534767	NULL	9.0	9169.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:03.961	1969-12-31 16:00:14.126	true	NULL
+-13	-13372	528534767	NULL	-13.0	-13372.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:08.499	1969-12-31 15:59:48.221	true	NULL
+38	-11320	528534767	NULL	38.0	-11320.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 16:00:09.169	1969-12-31 16:00:03.822	true	NULL
+-4	-1027	528534767	NULL	-4.0	-1027.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:46.628	1969-12-31 16:00:11.413	true	NULL
+-47	-2468	528534767	NULL	-47.0	-2468.0	cvLH6Eat2yFsyy7p	NULL	1969-12-31 15:59:48.68	1969-12-31 16:00:02.94	true	NULL
+PREHOOK: query: select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+POSTHOOK: query: select csmallint, cstring1, cboolean2 from text_llap100 order by csmallint, cstring1, cboolean2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@text_llap100
+#### A masked pattern was here ####
+-15912	cvLH6Eat2yFsyy7p	NULL
+-15813	cvLH6Eat2yFsyy7p	NULL
+-15790	cvLH6Eat2yFsyy7p	NULL
+-15549	cvLH6Eat2yFsyy7p	NULL
+-15431	cvLH6Eat2yFsyy7p	NULL
+-15059	cvLH6Eat2yFsyy7p	NULL
+-14965	cvLH6Eat2yFsyy7p	NULL
+-14739	cvLH6Eat2yFsyy7p	NULL
+-14379	cvLH6Eat2yFsyy7p	NULL
+-14072	cvLH6Eat2yFsyy7p	NULL
+-13372	cvLH6Eat2yFsyy7p	NULL
+-13326	cvLH6Eat2yFsyy7p	NULL
+-13229	cvLH6Eat2yFsyy7p	NULL
+-12472	cvLH6Eat2yFsyy7p	NULL
+-12171	cvLH6Eat2yFsyy7p	NULL
+-12083	cvLH6Eat2yFsyy7p	NULL
+-11492	cvLH6Eat2yFsyy7p	NULL
+-11320	cvLH6Eat2yFsyy7p	NULL
+-10909	cvLH6Eat2yFsyy7p	NULL
+-10268	cvLH6Eat2yFsyy7p	NULL
+-10154	cvLH6Eat2yFsyy7p	NULL
+-10129	cvLH6Eat2yFsyy7p	NULL
+-9566	cvLH6Eat2yFsyy7p	NULL
+-7984	cvLH6Eat2yFsyy7p	NULL
+-7964	cvLH6Eat2yFsyy7p	NULL
+-7824	cvLH6Eat2yFsyy7p	NULL
+-7735	cvLH6Eat2yFsyy7p	NULL
+-7449	cvLH6Eat2yFsyy7p	NULL
+-7353	cvLH6Eat2yFsyy7p	NULL
+-7183	cvLH6Eat2yFsyy7p	NULL
+-6922	cvLH6Eat2yFsyy7p	NULL
+-6583	cvLH6Eat2yFsyy7p	NULL
+-4812	cvLH6Eat2yFsyy7p	NULL
+-4667	cvLH6Eat2yFsyy7p	NULL
+-4490	cvLH6Eat2yFsyy7p	NULL
+-4463	cvLH6Eat2yFsyy7p	NULL
+-4255	cvLH6Eat2yFsyy7p	NULL
+-4213	cvLH6Eat2yFsyy7p	NULL
+-3419	cvLH6Eat2yFsyy7p	NULL
+-3166	cvLH6Eat2yFsyy7p	NULL
+-3045	cvLH6Eat2yFsyy7p	NULL
+-3012	cvLH6Eat2yFsyy7p	NULL
+-2468	cvLH6Eat2yFsyy7p	NULL
+-2013	cvLH6Eat2yFsyy7p	NULL
+-1990	cvLH6Eat2yFsyy7p	NULL
+-1724	cvLH6Eat2yFsyy7p	NULL
+-1299	cvLH6Eat2yFsyy7p	NULL
+-1254	cvLH6Eat2yFsyy7p	NULL
+-1027	cvLH6Eat2yFsyy7p	NULL
+-814	cvLH6Eat2yFsyy7p	NULL
+-75	cvLH6Eat2yFsyy7p	NULL
+10	cvLH6Eat2yFsyy7p	NULL
+37	cvLH6Eat2yFsyy7p	NULL
+77	cvLH6Eat2yFsyy7p	NULL
+163	cvLH6Eat2yFsyy7p	NULL
+486	cvLH6Eat2yFsyy7p	NULL
+834	cvLH6Eat2yFsyy7p	NULL
+1206	cvLH6Eat2yFsyy7p	NULL
+1345	cvLH6Eat2yFsyy7p	NULL
+1358	cvLH6Eat2yFsyy7p	NULL
+1475	cvLH6Eat2yFsyy7p	NULL
+1639	cvLH6Eat2yFsyy7p	NULL
+2541	cvLH6Eat2yFsyy7p	NULL
+2617	cvLH6Eat2yFsyy7p	NULL
+3168	cvLH6Eat2yFsyy7p	NULL
+3856	cvLH6Eat2yFsyy7p	NULL
+3961	cvLH6Eat2yFsyy7p	NULL
+4181	cvLH6Eat2yFsyy7p	NULL
+4432	cvLH6Eat2yFsyy7p	NULL
+4587	cvLH6Eat2yFsyy7p	NULL
+4963	cvLH6Eat2yFsyy7p	NULL
+5521	cvLH6Eat2yFsyy7p	NULL
+5780	cvLH6Eat2yFsyy7p	NULL
+6453	cvLH6Eat2yFsyy7p	NULL
+6557	cvLH6Eat2yFsyy7p	NULL
+6958	cvLH6Eat2yFsyy7p	NULL
+7021	cvLH6Eat2yFsyy7p	NULL
+7350	cvLH6Eat2yFsyy7p	NULL
+7476	cvLH6Eat2yFsyy7p	NULL
+7860	cvLH6Eat2yFsyy7p	NULL
+7952	cvLH6Eat2yFsyy7p	NULL
+8035	cvLH6Eat2yFsyy7p	NULL
+8353	cvLH6Eat2yFsyy7p	NULL
+8402	cvLH6Eat2yFsyy7p	NULL
+8499	cvLH6Eat2yFsyy7p	NULL
+9169	cvLH6Eat2yFsyy7p	NULL
+9366	cvLH6Eat2yFsyy7p	NULL
+9472	cvLH6Eat2yFsyy7p	NULL
+10688	cvLH6Eat2yFsyy7p	NULL
+11242	cvLH6Eat2yFsyy7p	NULL
+11737	cvLH6Eat2yFsyy7p	NULL
+12161	cvLH6Eat2yFsyy7p	NULL
+12422	cvLH6Eat2yFsyy7p	NULL
+13026	cvLH6Eat2yFsyy7p	NULL
+13300	cvLH6Eat2yFsyy7p	NULL
+14072	cvLH6Eat2yFsyy7p	NULL
+14625	cvLH6Eat2yFsyy7p	NULL
+14907	cvLH6Eat2yFsyy7p	NULL
+15007	cvLH6Eat2yFsyy7p	NULL
+15626	cvLH6Eat2yFsyy7p	NULL
+PREHOOK: query: DROP TABLE text_llap
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@text_llap
+PREHOOK: Output: default@text_llap
+POSTHOOK: query: DROP TABLE text_llap
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@text_llap
+POSTHOOK: Output: default@text_llap

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
index b84aeb5..a9263ec 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DiskRangeList.java
@@ -207,4 +207,10 @@ public class DiskRangeList extends DiskRange {
       return result;
     }
   }
+
+  public void setEnd(long newEnd) {
+    assert newEnd >= this.offset;
+    assert this.next == null || this.next.offset >= newEnd;
+    this.end = newEnd;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
index 13772c9..b894c11e 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
@@ -22,6 +22,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A block of data for a given section of a file, similar to VRB but in encoded form.
  * Stores a set of buffers for each encoded stream that is a part of each column.
@@ -72,6 +75,19 @@ public class EncodedColumnBatch<BatchKey> {
     public void setIndexBaseOffset(int indexBaseOffset) {
       this.indexBaseOffset = indexBaseOffset;
     }
+
+    @Override
+    public String toString() {
+      String bufStr = "";
+      if (cacheBuffers != null) {
+        for (MemoryBuffer mb : cacheBuffers) {
+          bufStr += mb.getClass().getSimpleName() + " with " + mb.getByteBufferRaw().remaining() + " bytes, ";
+        }
+      }
+      return "ColumnStreamData [cacheBuffers=[" + bufStr
+          + "], indexBaseOffset=" + indexBaseOffset + "]";
+    }
+
   }
 
   /** The key that is used to map this batch to source location. */
@@ -104,6 +120,7 @@ public class EncodedColumnBatch<BatchKey> {
     }
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(EncodedColumnBatch.class);
   public void setStreamData(int colIx, int streamIx, ColumnStreamData csd) {
     assert hasData[colIx];
     columnData[colIx][streamIx] = csd;


[3/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
new file mode 100644
index 0000000..e84f5cc
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
+import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+class LlapRecordReader
+    implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
+
+  private final FileSplit split;
+  private final List<Integer> columnIds;
+  private final SearchArgument sarg;
+  private final String[] columnNames;
+  private final VectorizedRowBatchCtx rbCtx;
+  private final Object[] partitionValues;
+
+  private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
+  private ColumnVectorBatch lastCvb = null;
+  private boolean isFirst = true;
+
+  private Throwable pendingError = null;
+  /** Vector that is currently being processed by our user. */
+  private boolean isDone = false;
+  private final boolean isClosed = false;
+  private final ConsumerFeedback<ColumnVectorBatch> feedback;
+  private final QueryFragmentCounters counters;
+  private long firstReturnTime;
+
+  private final JobConf jobConf;
+  private final boolean[] includedColumns;
+  private final ReadPipeline rp;
+  private final ExecutorService executor;
+
+  private TypeDescription fileSchema;
+
+  public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
+      String hostName, ColumnVectorProducer cvp, ExecutorService executor,
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter)
+          throws IOException, HiveException {
+    this.executor = executor;
+    this.jobConf = job;
+    this.split = split;
+    this.columnIds = includedCols;
+    this.sarg = ConvertAstToSearchArg.createFromConf(job);
+    this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
+    final String fragmentId = LlapTezUtils.getFragmentId(job);
+    final String dagId = LlapTezUtils.getDagId(job);
+    final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
+    MDC.put("dagId", dagId);
+    MDC.put("queryId", queryId);
+    TezCounters taskCounters = null;
+    if (fragmentId != null) {
+      MDC.put("fragmentId", fragmentId);
+      taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
+      LOG.info("Received fragment id: {}", fragmentId);
+    } else {
+      LOG.warn("Not using tez counters as fragment id string is null");
+    }
+    this.counters = new QueryFragmentCounters(job, taskCounters);
+    this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
+
+    MapWork mapWork = Utilities.getMapWork(job);
+    VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
+    rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
+
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
+    } else {
+      partitionValues = null;
+    }
+
+    boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+    TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
+        job, isAcidScan, Integer.MAX_VALUE);
+
+    // Create the consumer of encoded data; it will coordinate decoding to CVBs.
+    feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames,
+        counters, schema, sourceInputFormat, sourceSerDe, reporter, job);
+    fileSchema = rp.getFileSchema();
+    includedColumns = rp.getIncludedColumns();
+  }
+
+  /**
+   * Starts the data read pipeline
+   */
+  public boolean init() {
+    if (!checkOrcSchemaEvolution()) return false;
+
+    // perform the data read asynchronously
+    if (executor instanceof StatsRecordingThreadPool) {
+      // Every thread created by this thread pool will use the same handler
+      ((StatsRecordingThreadPool) executor).setUncaughtExceptionHandler(
+          new IOUncaughtExceptionHandler());
+    }
+    executor.submit(rp.getReadCallable());
+    return true;
+  }
+
+  private boolean checkOrcSchemaEvolution() {
+    SchemaEvolution schemaEvolution = new SchemaEvolution(
+        fileSchema, rp.getReaderSchema(), includedColumns);
+    for (Integer colId : columnIds) {
+      if (!schemaEvolution.isPPDSafeConversion(colId)) {
+        LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+    assert value != null;
+    if (isClosed) {
+      throw new AssertionError("next called after close");
+    }
+    // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+    boolean wasFirst = isFirst;
+    if (isFirst) {
+      if (partitionValues != null) {
+        rbCtx.addPartitionColsToBatch(value, partitionValues);
+      }
+      isFirst = false;
+    }
+    ColumnVectorBatch cvb = null;
+    try {
+      cvb = nextCvb();
+    } catch (InterruptedException e) {
+      // Query might have been canceled. Stop the background processing.
+      feedback.stop();
+      throw new IOException(e);
+    }
+    if (cvb == null) {
+      if (wasFirst) {
+        firstReturnTime = counters.startTimeCounter();
+      }
+      counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
+      return false;
+    }
+    if (columnIds.size() != cvb.cols.length) {
+      throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
+          + " included, but the reader returned " + cvb.cols.length);
+    }
+    // VRB was created from VrbCtx, so we already have pre-allocated column vectors
+    for (int i = 0; i < cvb.cols.length; ++i) {
+      // Return old CVs (if any) to caller. We assume these things all have the same schema.
+      cvb.swapColumnVector(i, value.cols, columnIds.get(i));
+    }
+    value.selectedInUse = false;
+    value.size = cvb.size;
+    if (wasFirst) {
+      firstReturnTime = counters.startTimeCounter();
+    }
+    return true;
+  }
+
+  public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
+    return rbCtx;
+  }
+
+  private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(final Thread t, final Throwable e) {
+      LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
+          " Message: {}", t.getName(), t.getId(), e.getMessage());
+      setError(e);
+    }
+  }
+
+  ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
+    boolean isFirst = (lastCvb == null);
+    if (!isFirst) {
+      feedback.returnData(lastCvb);
+    }
+    synchronized (pendingData) {
+      // We are waiting for next block. Either we will get it, or be told we are done.
+      boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
+      if (doLogBlocking) {
+        LlapIoImpl.LOG.trace("next will block");
+      }
+      while (isNothingToReport()) {
+        pendingData.wait(100);
+      }
+      if (doLogBlocking) {
+        LlapIoImpl.LOG.trace("next is unblocked");
+      }
+      rethrowErrorIfAny();
+      lastCvb = pendingData.poll();
+    }
+    if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
+      LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
+    }
+    return lastCvb;
+  }
+
+  private boolean isNothingToReport() {
+    return !isDone && pendingData.isEmpty() && pendingError == null;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    return rbCtx.createVectorizedRowBatch();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
+    feedback.stop();
+    rethrowErrorIfAny();
+    MDC.clear();
+  }
+
+  private void rethrowErrorIfAny() throws IOException {
+    if (pendingError == null) return;
+    if (pendingError instanceof IOException) {
+      throw (IOException)pendingError;
+    }
+    throw new IOException(pendingError);
+  }
+
+  @Override
+  public void setDone() {
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("setDone called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    synchronized (pendingData) {
+      isDone = true;
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public void consumeData(ColumnVectorBatch data) {
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("consume called; closed {}, done {}, err {}, pending {}",
+          isClosed, isDone, pendingError, pendingData.size());
+    }
+    synchronized (pendingData) {
+      if (isClosed) {
+        return;
+      }
+      pendingData.add(data);
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public void setError(Throwable t) {
+    counters.incrCounter(LlapIOCounters.NUM_ERRORS);
+    LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
+        isClosed, isDone, pendingError, pendingData.size());
+    assert t != null;
+    synchronized (pendingData) {
+      pendingError = t;
+      pendingData.notifyAll();
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    // TODO: plumb progress info thru the reader if we can get metadata from loader first.
+    return 0.0f;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index db86296..2e4f2ba 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -25,7 +25,11 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
 
 /**
@@ -33,7 +37,8 @@ import org.apache.orc.TypeDescription;
  */
 public interface ColumnVectorProducer {
   ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
-                                  List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-                                  QueryFragmentCounters counters,
-                                  TypeDescription readerSchema) throws IOException;
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters, TypeDescription readerSchema,
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter,
+      JobConf job) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 6b54b30..04fed44 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.TypeDescription;
 
 public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
   implements Consumer<BatchType>, ReadPipeline {
@@ -122,4 +123,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     // We are just a relay; send unpause to encoded data producer.
     upstreamFeedback.unpause();
   }
+
+  @Override
+  public TypeDescription getFileSchema() {
+    // TODO: the ORC-specific method should be removed from the interface instead.
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
new file mode 100644
index 0000000..d384b85
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public class GenericColumnVectorProducer implements ColumnVectorProducer {
+  private final SerDeLowLevelCacheImpl cache;
+  private final BufferUsageManager bufferManager;
+  private final Configuration conf;
+  private final LlapDaemonCacheMetrics cacheMetrics;
+  private final LlapDaemonIOMetrics ioMetrics;
+
+  public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache,
+      BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics cacheMetrics,
+      LlapDaemonIOMetrics ioMetrics) {
+    LlapIoImpl.LOG.info("Initializing ORC column vector producer");
+    this.cache = serdeCache;
+    this.bufferManager = bufferManager;
+    this.conf = conf;
+    this.cacheMetrics = cacheMetrics;
+    this.ioMetrics = ioMetrics;
+  }
+
+  @Override
+  public ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters, TypeDescription schema, InputFormat<?, ?> sourceInputFormat,
+      Deserializer sourceSerDe, Reporter reporter, JobConf job) throws IOException {
+    cacheMetrics.incrCacheReadRequests();
+    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
+        consumer, columnIds.size(), false, counters, ioMetrics);
+    TextFileMetadata fm;
+    try {
+      fm = new TextFileMetadata(sourceSerDe);
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+    edc.setFileMetadata(fm);
+    // Note that we pass job config to the record reader, but use global config for LLAP IO.
+    SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache,
+        bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat,
+        sourceSerDe, counters, fm.getSchema());
+    edc.init(reader, reader);
+    LlapIoImpl.LOG.info("Ignoring schema: " + schema);
+    return edc;
+  }
+
+
+  public static final class TextStripeMetadata implements ConsumerStripeMetadata {
+    // The writer is local to the process.
+    private final String writerTimezone = TimeZone.getDefault().getID();
+    private List<ColumnEncoding> encodings;
+    private final int stripeIx;
+    private long rowCount = -1;
+
+    public TextStripeMetadata(int stripeIx) {
+      this.stripeIx = stripeIx;
+    }
+
+    @Override
+    public String getWriterTimezone() {
+      return writerTimezone;
+    }
+
+    @Override
+    public int getStripeIx() {
+      return stripeIx;
+    }
+
+    @Override
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    @Override
+    public List<ColumnEncoding> getEncodings() {
+      return encodings;
+    }
+
+    @Override
+    public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+      throw new UnsupportedOperationException();
+    }
+
+    public void setEncodings(List<ColumnEncoding> encodings) {
+      this.encodings = encodings;
+    }
+
+    @Override
+    public RowIndex[] getRowIndexes() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean supportsRowIndexes() {
+      return false;
+    }
+
+    public void setRowCount(long value) {
+      rowCount = value;
+    }
+
+    @Override
+    public String toString() {
+      return "[stripeIx=" + stripeIx + ", rowCount=" + rowCount + ", encodings=" + encodings + "]".replace('\n', ' ');
+    }
+  }
+
+
+  private static final class TextFileMetadata implements ConsumerFileMetadata {
+    private final List<Type> orcTypes = new ArrayList<>();
+    private final TypeDescription schema;
+    public TextFileMetadata(Deserializer sourceSerDe) throws SerDeException {
+      TypeDescription schema = OrcInputFormat.convertTypeInfo(
+          TypeInfoUtils.getTypeInfoFromObjectInspector(sourceSerDe.getObjectInspector()));
+      this.schema = schema;
+      addTypesFromSchema(schema);
+    }
+
+    private void addTypesFromSchema(TypeDescription schema) {
+      // The same thing that WriterImpl does when writing the footer, but w/o the footer.
+      OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+      List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
+      orcTypes.add(type.build());
+      if (children == null) return;
+      for(TypeDescription child : children) {
+        addTypesFromSchema(child);
+      }
+    }
+
+    @Override
+    public List<Type> getTypes() {
+      return orcTypes;
+    }
+
+    @Override
+    public int getStripeCount() {
+      return 1;
+    }
+
+    @Override
+    public CompressionKind getCompressionKind() {
+      return CompressionKind.NONE;
+    }
+
+    @Override
+    public TypeDescription getSchema() {
+      return schema;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
new file mode 100644
index 0000000..7fecdaa
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/LlapTextInputFormat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.decode;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
+
+// TODO# VectorizedInputFormatInterface is a hack... only "vectorized" in LLAP IO.
+//       How to resolve optimizer dependency?
+public class LlapTextInputFormat extends org.apache.hadoop.mapred.TextInputFormat
+  implements LlapWrappableInputFormatInterface, VectorizedInputFormatInterface {
+
+  @Override
+  public boolean isSerdeBased() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 2e9b9c3..565e3d2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -34,7 +34,11 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
 
 public class OrcColumnVectorProducer implements ColumnVectorProducer {
@@ -63,12 +67,14 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
 
   @Override
   public ReadPipeline createReadPipeline(
-      Consumer<ColumnVectorBatch> consumer, FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException {
+      Consumer<ColumnVectorBatch> consumer, FileSplit split, List<Integer> columnIds,
+      SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters,
+      TypeDescription readerSchema, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
+      Reporter reporter, JobConf job) throws IOException {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, ioMetrics);
+    // Note: we use global conf here and ignore JobConf.
     OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
         metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema);
     edc.init(reader, reader);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 29f1ba8..eb40d1f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -16,16 +16,16 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory.SettableTreeReader;
@@ -53,11 +52,12 @@ import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
 import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
 import org.apache.orc.OrcProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 
 public class OrcEncodedDataConsumer
   extends EncodedDataConsumer<OrcBatchKey, OrcEncodedColumnBatch> {
@@ -65,9 +65,9 @@ public class OrcEncodedDataConsumer
   private TreeReaderFactory.TreeReader[] columnReaders;
   private int[] columnMapping; // Mapping from columnReaders (by index) to columns in file schema.
   private int previousStripeIndex = -1;
-  private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
+  private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file.
   private CompressionCodec codec;
-  private OrcStripeMetadata[] stripes;
+  private List<ConsumerStripeMetadata> stripes;
   private final boolean skipCorrupt; // TODO: get rid of this
   private final QueryFragmentCounters counters;
   private boolean[] includedColumns;
@@ -82,16 +82,21 @@ public class OrcEncodedDataConsumer
     this.counters = counters;
   }
 
-  public void setFileMetadata(OrcFileMetadata f) {
+  public void setFileMetadata(ConsumerFileMetadata f) {
     assert fileMetadata == null;
     fileMetadata = f;
-    stripes = new OrcStripeMetadata[f.getStripes().size()];
-    codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind());
+    stripes = new ArrayList<>(f.getStripeCount());
+    codec = PhysicalFsWriter.createCodec(f.getCompressionKind());
   }
 
-  public void setStripeMetadata(OrcStripeMetadata m) {
+  public void setStripeMetadata(ConsumerStripeMetadata m) {
     assert stripes != null;
-    stripes[m.getStripeIx()] = m;
+    int newIx = m.getStripeIx();
+    for (int i = stripes.size(); i <= newIx; ++i) {
+      stripes.add(null);
+    }
+    assert stripes.get(newIx) == null;
+    stripes.set(newIx, m);
   }
 
   @Override
@@ -103,17 +108,18 @@ public class OrcEncodedDataConsumer
     boolean sameStripe = currentStripeIndex == previousStripeIndex;
 
     try {
-      OrcStripeMetadata stripeMetadata = stripes[currentStripeIndex];
+      ConsumerStripeMetadata stripeMetadata = stripes.get(currentStripeIndex);
       // Get non null row count from root column, to get max vector batches
       int rgIdx = batch.getBatchKey().rgIx;
       long nonNullRowCount = -1;
       if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) {
         nonNullRowCount = stripeMetadata.getRowCount();
       } else {
-        OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
+        OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexEntry(0, rgIdx);
         nonNullRowCount = getRowCount(rowIndex);
       }
       int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
+      // LOG.info("TODO# expecting " + nonNullRowCount + " rows with at most " + maxBatchesRG);
       int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
       TypeDescription schema = fileMetadata.getSchema();
 
@@ -136,6 +142,7 @@ public class OrcEncodedDataConsumer
           batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
           if (batchSize == 0) break;
         }
+        // LOG.info("TODO# batch " + i + " of " + batchSize);
 
         ColumnVectorBatch cvb = cvbPool.take();
         // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
@@ -148,6 +155,7 @@ public class OrcEncodedDataConsumer
             cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), batchSize);
           }
           cvb.cols[idx].ensureSize(batchSize, false);
+          LOG.info("TODO# nextVector on " + idx + "; "+ reader + " for " + columnMapping[idx]);
           reader.nextVector(cvb.cols[idx], null, batchSize);
         }
 
@@ -155,6 +163,7 @@ public class OrcEncodedDataConsumer
         downstreamConsumer.consumeData(cvb);
         counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
       }
+      LOG.info("TODO# done with decode");
       counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
       counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
       counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
@@ -214,8 +223,9 @@ public class OrcEncodedDataConsumer
   }
 
   private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
+      EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
     PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    // LOG.info("TODO# positionInStreams pps " + Lists.newArrayList(pps));
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       columnReaders[i].seek(pps);
@@ -224,8 +234,9 @@ public class OrcEncodedDataConsumer
 
   private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
       EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
-      OrcStripeMetadata stripeMetadata) throws IOException {
+      ConsumerStripeMetadata stripeMetadata) throws IOException {
     PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    // LOG.info("TODO# repositionInStreams pps " + Lists.newArrayList(pps));
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       TreeReader reader = columnReaders[i];
@@ -240,20 +251,46 @@ public class OrcEncodedDataConsumer
     }
   }
 
-  private PositionProvider[] createPositionProviders(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, OrcStripeMetadata stripeMetadata) throws IOException {
-    if (columnReaders.length == 0) return null;
-    int rowGroupIndex = batch.getBatchKey().rgIx;
-    if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
-      throw new IOException("Cannot position readers without RG information");
+  /**
+   * Position provider used in absence of indexes, e.g. for serde-based reader, where each stream
+   * is in its own physical 'container', always starting at 0, and there are no RGs.
+   */
+  private final static class IndexlessPositionProvider implements PositionProvider {
+    @Override
+    public long getNext() {
+      return 0;
     }
-    // TODO: this assumes indexes in getRowIndexes would match column IDs
-    OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
-    PositionProvider[] pps = new PositionProvider[ris.length];
-    for (int i = 0; i < ris.length; ++i) {
-      OrcProto.RowIndex ri = ris[i];
-      if (ri == null) continue;
-      pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+
+    @Override
+    public String toString() {
+      return "indexes not supported";
+    }
+  }
+
+  private PositionProvider[] createPositionProviders(
+      TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+      ConsumerStripeMetadata stripeMetadata) throws IOException {
+    if (columnReaders.length == 0) return null;
+    PositionProvider[] pps = null;
+    if (!stripeMetadata.supportsRowIndexes()) {
+      PositionProvider singleRgPp = new IndexlessPositionProvider();
+      pps = new PositionProvider[stripeMetadata.getEncodings().size()];
+      for (int i = 0; i < pps.length; ++i) {
+        pps[i] = singleRgPp;
+      }
+    } else {
+      int rowGroupIndex = batch.getBatchKey().rgIx;
+      if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
+        throw new IOException("Cannot position readers without RG information");
+      }
+      // TODO: this assumes indexes in getRowIndexes would match column IDs
+      OrcProto.RowIndex[] ris = stripeMetadata.getRowIndexes();
+      pps = new PositionProvider[ris.length];
+      for (int i = 0; i < ris.length; ++i) {
+        OrcProto.RowIndex ri = ris[i];
+        if (ri == null) continue;
+        pps[i] = new RecordReaderImpl.PositionProviderImpl(ri.getEntry(rowGroupIndex));
+      }
     }
     return pps;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
index 4e1b851..36f6c9c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java
@@ -25,7 +25,7 @@ import org.apache.orc.TypeDescription;
 
 public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> {
   public Callable<Void> getReadCallable();
-  TypeDescription getFileSchema();
+  TypeDescription getFileSchema(); // TODO: this is ORC-specific and should be removed
   TypeDescription getReaderSchema();
   boolean[] getIncludedColumns();
 }
\ No newline at end of file


[2/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
new file mode 100644
index 0000000..a70545e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -0,0 +1,1248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.TextStripeMetadata;
+import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.OrcFile.EncodingStrategy;
+import org.apache.orc.OrcFile.Version;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.OutStream.OutputReceiver;
+import org.apache.orc.impl.PhysicalWriter;
+import org.apache.orc.impl.StreamName;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class SerDeEncodedDataReader extends CallableWithNdc<Void>
+    implements ConsumerFeedback<OrcEncodedColumnBatch> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SerDeEncodedDataReader.class);
+  public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+        @Override
+        public ColumnStreamData create() {
+          return new ColumnStreamData();
+        }
+        @Override
+        public void resetBeforeOffer(ColumnStreamData t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+        @Override
+        public OrcEncodedColumnBatch create() {
+          return new OrcEncodedColumnBatch();
+        }
+        @Override
+        public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+          t.reset();
+        }
+      });
+  public static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+      @Override
+      public CacheChunk create() {
+        return new CacheChunk();
+      }
+      @Override
+      public void resetBeforeOffer(CacheChunk t) {
+        t.reset();
+      }
+    });
+  private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+    @Override
+    public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+      CacheChunk tcc = TCC_POOL.take();
+      tcc.init(buffer, offset, end);
+      return tcc;
+    }
+  };
+
+
+  private final SerDeLowLevelCacheImpl cache;
+  private final BufferUsageManager bufferManager;
+  private final Configuration conf;
+  private final FileSplit split;
+  private List<Integer> columnIds;
+  private final OrcEncodedDataConsumer consumer;
+  private final QueryFragmentCounters counters;
+  private final UserGroupInformation ugi;
+
+  private final Object fileKey;
+  private final FileSystem fs;
+
+  private volatile boolean isStopped = false;
+  private final Deserializer sourceSerDe;
+  private final InputFormat<?, ?> sourceInputFormat;
+  private final Reporter reporter;
+  private final JobConf jobConf;
+  private final int allocSize;
+  private final int targetSliceRowCount;
+
+  private final boolean[] writerIncludes;
+  private WriterImpl orcWriter = null;
+  private CacheWriter cacheWriter = null;
+  /**
+   * Data from cache currently being processed. We store it here so that we could decref
+   * it in case of failures. We remove each slice from the data after it has been sent to
+   * the consumer, at which point the consumer is responsible for it.
+   */
+  private FileData cachedData;
+
+  public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
+      BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
+      List<Integer> columnIds, OrcEncodedDataConsumer consumer,
+      JobConf jobConf, Reporter reporter, InputFormat<?, ?> sourceInputFormat,
+      Deserializer sourceSerDe, QueryFragmentCounters counters, TypeDescription schema)
+          throws IOException {
+    this.cache = cache;
+    this.bufferManager = bufferManager;
+    this.conf = daemonConf;
+    this.split = split;
+    this.columnIds = columnIds;
+    this.allocSize = determineAllocSize(bufferManager, daemonConf);
+    boolean isInTest = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_IN_TEST);
+    this.targetSliceRowCount = HiveConf.getIntVar(
+        isInTest ? jobConf : daemonConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT);
+    LOG.info("TODO# targetSliceRowCount = " + targetSliceRowCount);
+    if (this.columnIds != null) {
+      Collections.sort(this.columnIds);
+    }
+    this.consumer = consumer;
+    this.counters = counters;
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    fs = split.getPath().getFileSystem(daemonConf);
+    fileKey = determineFileId(fs, split,
+        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
+    this.sourceInputFormat = sourceInputFormat;
+    this.sourceSerDe = sourceSerDe;
+    this.reporter = reporter;
+    this.jobConf = jobConf;
+    this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
+  }
+
+  private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
+    long allocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_ENCODE_ALLOC_SIZE);
+    int maxAllocSize = bufferManager.getAllocator().getMaxAllocation();
+    if (allocSize > maxAllocSize) {
+      LOG.error("Encode allocation size " + allocSize + " is being capped to the maximum "
+          + "allocation size " + bufferManager.getAllocator().getMaxAllocation());
+      allocSize = maxAllocSize;
+    }
+    return (int)allocSize;
+  }
+
+  @Override
+  public void stop() {
+    LOG.debug("Encoded reader is being stopped");
+    isStopped = true;
+  }
+
+  @Override
+  public void pause() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void unpause() {
+    throw new UnsupportedOperationException();
+  }
+
+  // TODO: move to base class?
+  @Override
+  protected Void callInternal() throws IOException, InterruptedException {
+    return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        return performDataRead();
+      }
+    });
+  }
+
+  private static final class LineRrOffsetReader extends PassThruOffsetReader {
+    private static final Method isCompressedMethod;
+    private final LineRecordReader lrReader;
+    private final LongWritable posKey;
+
+    static {
+      Method isCompressedMethodTmp;
+      try {
+        isCompressedMethodTmp = LineRecordReader.class.getDeclaredMethod("isCompressedInput");
+        isCompressedMethodTmp.setAccessible(true);
+      } catch (Throwable t) {
+        isCompressedMethodTmp = null;
+        LOG.info("TODO# cannot get LineRecordReader isCompressedInput method", t);
+      }
+      isCompressedMethod = isCompressedMethodTmp;
+    }
+
+    static ReaderWithOffsets create(LineRecordReader sourceReader) {
+      if (isCompressedMethod == null) return null;
+      Boolean isCompressed = null;
+      try {
+        isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
+      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+        LOG.info("TODO# cannot check the reader for compression", e);
+        return new PassThruOffsetReader(sourceReader);
+      }
+      if (isCompressed) return null; // Cannot slice compressed files.
+      return new LineRrOffsetReader(sourceReader);
+    }
+
+    private LineRrOffsetReader(LineRecordReader sourceReader) {
+      super(sourceReader);
+      this.lrReader = sourceReader;
+      this.posKey = (LongWritable)key;
+    }
+
+    @Override
+    public long getCurrentRowStartOffset() {
+      return posKey.get();
+    }
+
+    @Override
+    public long getCurrentFileOffset() {
+      try {
+        return lrReader.getPos();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  private static class PassThruOffsetReader implements ReaderWithOffsets {
+    protected final RecordReader sourceReader;
+    protected final Object key;
+    protected final Writable value;
+
+    private PassThruOffsetReader(RecordReader sourceReader) {
+      this.sourceReader = sourceReader;
+      key = sourceReader.createKey();
+      value = (Writable)sourceReader.createValue();
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return sourceReader.next(key, value);
+    }
+
+    @Override
+    public Writable getCurrentValue() {
+      return value;
+    }
+
+    @Override
+    public void close() throws IOException {
+      sourceReader.close();
+    }
+
+    @Override
+    public long getCurrentRowStartOffset() {
+      return -1;
+    }
+
+    @Override
+    public long getCurrentFileOffset() {
+      return -1;
+    }
+  }
+
+  public static class CacheOutStream extends OutStream {
+    private final CacheOutputReceiver receiver;
+    public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
+        CacheOutputReceiver receiver) throws IOException {
+      super(name, bufferSize, codec, receiver);
+      this.receiver = receiver;
+    }
+
+    @Override
+    public void clear() throws IOException {
+      super.clear();
+      receiver.clear();
+    }
+  }
+
+  private interface ReaderWithOffsets {
+    boolean next() throws IOException;
+    Writable getCurrentValue();
+    long getCurrentRowStartOffset();
+    void close() throws IOException;
+    long getCurrentFileOffset();
+  }
+
+  public static class CacheWriter implements PhysicalWriter {
+    // Struct.
+    private static class CacheStreamData {
+      private final List<MemoryBuffer> data;
+      private final boolean isSuppressed;
+      private final StreamName name;
+      public CacheStreamData(boolean isSuppressed, StreamName name, List<MemoryBuffer> data) {
+        this.isSuppressed = isSuppressed;
+        this.name = name;
+        this.data = data;
+      }
+      @Override
+      public String toString() {
+        return "CacheStreamData [name=" + name + ", isSuppressed="
+            + isSuppressed + ", data=" + toString(data) + "]";
+      }
+      private static String toString(List<MemoryBuffer> data) {
+        String s = "";
+        for (MemoryBuffer buffer : data) {
+          s += LlapDataBuffer.toDataString(buffer) + ", ";
+        }
+        return s;
+      }
+    }
+
+    private static class CacheStripeData {
+      private List<ColumnEncoding> encodings;
+      private long rowCount = -1;
+      private long knownTornStart, firstRowStart, lastRowStart, lastRowEnd;
+      private Map<Integer, List<CacheStreamData>> colStreams = new HashMap<>();
+      @Override
+      public String toString() {
+        return ("{disk data knownTornStart=" + knownTornStart
+            + ", firstRowStart=" + firstRowStart + ", lastRowStart="
+            + lastRowStart + ", lastRowEnd=" + lastRowEnd + ", rowCount=" + rowCount
+            + ", encodings=" + encodings + ", streams=" + colStreams + "}").replace('\n', ' ');
+      }
+
+      public String toCoordinateString() {
+        return "knownTornStart=" + knownTornStart + ", firstRowStart=" + firstRowStart
+            + ", lastRowStart=" + lastRowStart + ", lastRowEnd=" + lastRowEnd;
+      }
+    }
+
+    private CacheStripeData currentStripe;
+    private final List<CacheStripeData> stripes = new ArrayList<>();
+    private final BufferUsageManager bufferManager;
+    private final int bufferSize;
+    private final List<Integer> columnIds;
+    private final boolean[] writerIncludes;
+    // These are global since ORC reuses objects between stripes.
+    private final Map<StreamName, OutStream> streams = new HashMap<>();
+    private final Map<Integer, List<CacheOutStream>> colStreams = new HashMap<>();
+
+    public CacheWriter(BufferUsageManager bufferManager, int bufferSize, List<Integer> columnIds,
+        boolean[] writerIncludes) {
+      this.bufferManager = bufferManager;
+      this.bufferSize = bufferSize;
+      this.columnIds = columnIds;
+      this.writerIncludes = writerIncludes;
+      startStripe();
+    }
+
+    private void startStripe() {
+      if (currentStripe != null) {
+        stripes.add(currentStripe);
+      }
+      currentStripe = new CacheStripeData();
+    }
+
+    @Override
+    public void initialize() throws IOException {
+    }
+
+    @Override
+    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+      OrcProto.Metadata metadata = builder.build();
+      // LOG.info("TODO# Processing file metadata " + metadata);
+    }
+
+    @Override
+    public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+      OrcProto.Footer footer = builder.build();
+      // LOG.info("TODO# Processing file footer " + footer);
+      validateIncludes(footer);
+    }
+
+    public void validateIncludes(OrcProto.Footer footer) throws IOException {
+      boolean[] translatedIncludes = columnIds == null ? null : OrcInputFormat.genIncludedColumns(
+          OrcUtils.convertTypeFromProtobuf(footer.getTypesList(), 0), columnIds);
+      if (translatedIncludes == null && writerIncludes == null) return;
+      if (translatedIncludes == null || writerIncludes == null) {
+        throwIncludesMismatchError(translatedIncludes);
+      }
+      int len = Math.min(translatedIncludes.length, writerIncludes.length);
+      for (int i = 0; i < len; ++i) {
+        // Translated includes may be a superset of writer includes due to cache.
+        if (!translatedIncludes[i] && writerIncludes[i]) {
+          throwIncludesMismatchError(translatedIncludes);
+        }
+      }
+      if (translatedIncludes.length < writerIncludes.length) {
+        for (int i = len; i < writerIncludes.length; ++i) {
+          if (writerIncludes[i]) {
+            throwIncludesMismatchError(translatedIncludes);
+          }
+        }
+      }
+
+    }
+
+    private String throwIncludesMismatchError(boolean[] translated) throws IOException {
+      String s = "Includes derived from the original table: " + DebugUtils.toString(writerIncludes)
+          + " but the ones derived from writer types are: " + DebugUtils.toString(translated);
+      LOG.error(s);
+      throw new IOException(s);
+    }
+
+    @Override
+    public void writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+      // LOG.info("TODO# Ignoring post script " + builder.build());
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Closed from ORC writer, we still need the data. Do not discard anything.
+    }
+
+    public void discardData() {
+      LOG.info("TODO# discarding disk data (if any wasn't cached)");
+      for (CacheStripeData stripe : stripes) {
+        if (stripe.colStreams == null || stripe.colStreams.isEmpty()) continue;
+        for (List<CacheStreamData> streams : stripe.colStreams.values()) {
+          for (CacheStreamData cos : streams) {
+            for (MemoryBuffer buffer : cos.data) {
+              bufferManager.getAllocator().deallocate(buffer);
+            }
+          }
+        }
+        stripe.colStreams.clear();
+      }
+    }
+
+    @Override
+    public long getPhysicalStripeSize() {
+      return 0; // Always 0, no memory checks.
+    }
+
+    @Override
+    public boolean isCompressed() {
+      return false;
+    }
+
+    @Override
+    public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+      OutStream os = streams.get(name);
+      if (os != null) return os;
+      if (isNeeded(name)) {
+        LOG.info("Creating cache receiver for " + name);
+        CacheOutputReceiver or = new CacheOutputReceiver(bufferManager, name);
+        CacheOutStream cos = new CacheOutStream(name.toString(), bufferSize, null, or);
+        os = cos;
+        List<CacheOutStream> list = colStreams.get(name.getColumn());
+        if (list == null) {
+          list = new ArrayList<>();
+          colStreams.put(name.getColumn(), list);
+        }
+        list.add(cos);
+      } else {
+        LOG.info("Creating null receiver for " + name);
+        OutputReceiver or = new NullOutputReceiver(name);
+        os = new OutStream(name.toString(), bufferSize, null, or);
+      }
+      streams.put(name, os);
+      return os;
+    }
+
+    @Override
+    public void finalizeStripe(
+        OrcProto.StripeFooter.Builder footer,
+        OrcProto.StripeInformation.Builder dirEntry)
+        throws IOException {
+      List<ColumnEncoding> allEnc = footer.getColumnsList();
+      OrcProto.StripeInformation si = dirEntry.build();
+      LOG.info(("TODO# Finalizing stripe " + footer.build() + " => " + si).replace('\n', ' '));
+      currentStripe.encodings = new ArrayList<>(allEnc);
+      for (int i = 0; i < currentStripe.encodings.size(); ++i) {
+        // Don't record encodings for unneeded columns.
+        if (writerIncludes == null || writerIncludes[i]) continue;
+        currentStripe.encodings.set(i, null);
+      }
+      currentStripe.rowCount = si.getNumberOfRows();
+      // ORC writer reuses streams, so we need to clean them here and extract data.
+      for (Map.Entry<Integer, List<CacheOutStream>> e : colStreams.entrySet()) {
+        int colIx = e.getKey();
+        List<CacheOutStream> streams = e.getValue();
+        List<CacheStreamData> data = new ArrayList<>(streams.size());
+        for (CacheOutStream stream : streams) {
+          stream.flush();
+          List<MemoryBuffer> buffers = stream.receiver.buffers;
+          if (buffers == null) {
+            LOG.info("TODO# buffers are null for " + stream.receiver.name);
+          }
+          data.add(new CacheStreamData(stream.isSuppressed(), stream.receiver.name,
+              buffers == null ? new ArrayList<MemoryBuffer>() : new ArrayList<>(buffers)));
+          stream.clear();
+        }
+        currentStripe.colStreams.put(colIx, data);
+      }
+      startStripe();
+    }
+
+    @Override
+    public long estimateMemory() {
+      return 0; // We never ever use any memory.
+    }
+
+    @Override
+    public void writeIndexStream(StreamName name,
+        OrcProto.RowIndex.Builder rowIndex) throws IOException {
+      if (isNeeded(name)) {
+        // LOG.info("TODO# Saving index " + name);
+        // currentStripe.indexes.put(name.getColumn(), rowIndex.build());
+      } else {
+        // LOG.info("TODO# Ignoring index " + name + " => " + rowIndex);
+      }
+    }
+
+    private boolean isNeeded(StreamName name) {
+      return writerIncludes == null || writerIncludes[name.getColumn()];
+    }
+
+    @Override
+    public void writeBloomFilterStream(StreamName streamName,
+        OrcProto.BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+      // LOG.info("TODO# Ignoring bloom filter " + streamName + " => " + bloomFilterIndex);
+    }
+
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    @Override
+    public long getRawWriterPosition() throws IOException {
+      return -1; // Meaningless for this writer.
+    }
+
+    @Override
+    public void appendRawStripe(byte[] stripe, int offset, int length,
+        OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+      throw new UnsupportedOperationException(); // Only used in ACID writer.
+    }
+
+    public void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long currentFileOffset) {
+      currentStripe.knownTornStart = currentKnownTornStart;
+      currentStripe.firstRowStart = firstStartOffset;
+      currentStripe.lastRowStart = lastStartOffset;
+      currentStripe.lastRowEnd = currentFileOffset;
+    }
+  }
+
+  private interface CacheOutput {
+    List<MemoryBuffer> getData();
+    StreamName getName();
+  }
+
+  private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
+    private final BufferUsageManager bufferManager;
+    private final StreamName name;
+    private List<MemoryBuffer> buffers = null;
+    private int lastBufferPos = -1;
+
+    public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) {
+      this.bufferManager = bufferManager;
+      this.name = name;
+    }
+
+    public void clear() {
+      buffers = null;
+      lastBufferPos = -1;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      // TODO: avoid put() by working directly in OutStream?
+      LOG.info(name + " receiving a buffer of size " + buffer.remaining());
+      int size = buffer.remaining();
+      ByteBuffer bb = null;
+      if (buffers == null) {
+        buffers = new ArrayList<>();
+      }
+      if (!buffers.isEmpty()) {
+        MemoryBuffer lastBuffer = buffers.get(buffers.size() - 1);
+        bb = lastBuffer.getByteBufferRaw();
+        int written = lastBufferPos - bb.position();
+        if (bb.remaining() - written < size) {
+          lastBufferPos = -1;
+          bb = null;
+        }
+      }
+      boolean isNewBuffer = (lastBufferPos == -1);
+      if (isNewBuffer) {
+        MemoryBuffer[] dest = new MemoryBuffer[1];
+        bufferManager.getAllocator().allocateMultiple(dest, size);
+        LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+        bb = newBuffer.getByteBufferRaw();
+        lastBufferPos = bb.position();
+        buffers.add(newBuffer);
+      }
+      // Since there's no close() here, maintain the initial read position between writes.
+      int pos = bb.position();
+      bb.position(lastBufferPos);
+      bb.put(buffer);
+      lastBufferPos = bb.position();
+      bb.position(pos);
+    }
+
+    @Override
+    public List<MemoryBuffer> getData() {
+      return buffers;
+    }
+
+    @Override
+    public StreamName getName() {
+      return name;
+    }
+  }
+
+  private static class NullOutputReceiver implements OutputReceiver {
+    private final StreamName name;
+
+    public NullOutputReceiver(StreamName name) {
+      this.name = name;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      LOG.info(name + " discarding a buffer of size " + buffer.remaining());
+    }
+  }
+
+  protected Void performDataRead() throws IOException {
+    try {
+      long startTime = counters.startTimeCounter();
+      LlapIoImpl.LOG.info("Processing data for {}", split.getPath());
+      if (processStop()) {
+        recordReaderTime(startTime);
+        return null;
+      }
+
+      Boolean isFromCache = null;
+      try {
+        isFromCache = readFileWithCache(startTime);
+      } finally {
+        if (cachedData != null && cachedData.getData() != null) {
+          for (StripeData sd : cachedData.getData()) {
+            unlockAllBuffers(sd);
+          }
+        }
+      }
+      if (isFromCache == null) return null; // Stop requested, and handled inside.
+      if (!isFromCache) {
+        if (!processOneFileSplit(split, startTime, Ref.from(0), null)) return null;
+      }
+
+      // Done with all the things.
+      recordReaderTime(startTime);
+      LOG.info("TODO# calling setDone");
+      consumer.setDone();
+
+      LlapIoImpl.LOG.trace("done processing {}", split);
+      return null;
+    } catch (Throwable e) {
+      LOG.error("TODO# threw", e);
+      consumer.setError(e);
+      throw e;
+    } finally {
+      cleanupReaders();
+    }
+  }
+
+  private void unlockAllBuffers(StripeData si) {
+    for (int i = 0; i < si.getData().length; ++i) {
+      LlapDataBuffer[][] colData = si.getData()[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          bufferManager.decRefBuffer(streamData[k]);
+        }
+      }
+    }
+  }
+
+  public void cacheFileData(StripeData sd) {
+    if (sd == null || sd.getEncodings() == null) return;
+    if (fileKey != null) {
+      // Note that we cache each slice separately. We could cache them together at the end, but
+      // then we won't be able to pass them to users without inc-refing explicitly.
+      ColumnEncoding[] encodings = sd.getEncodings();
+      for (int i = 0; i < encodings.length; ++i) {
+        // Make data consistent with encodings, don't store useless information.
+        if (sd.getData()[i] == null) {
+          encodings[i] = null;
+        }
+      }
+      FileData fd = new FileData(fileKey, encodings.length);
+      fd.addStripe(sd);
+      cache.putFileData(fd, Priority.NORMAL, counters);
+    } else {
+      lockAllBuffers(sd);
+    }
+    // We assume that if put/lock throws in the middle, it's ok to treat buffers as not being
+    // locked and to blindly deallocate them, since they are not going to be used. Therefore
+    // we don't remove them from the cleanup list - we will do it after sending to consumer.
+    // This relies on sequence of calls to cacheFileData and sendEcb..
+  }
+
+
+  private void lockAllBuffers(StripeData sd) {
+    for (int i = 0; i < sd.getData().length; ++i) {
+      LlapDataBuffer[][] colData = sd.getData()[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          boolean canLock = bufferManager.incRefBuffer(streamData[k]);
+          assert canLock;
+        }
+      }
+    }
+  }
+
+  public Boolean readFileWithCache(long startTime) throws IOException {
+    if (fileKey == null) return false;
+    BooleanRef gotAllData = new BooleanRef();
+    long endOfSplit = split.getStart() + split.getLength();
+    this.cachedData = cache.getFileData(fileKey, split.getStart(),
+        endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+    if (cachedData == null) {
+      LOG.info("TODO# no data for the split found in cache");
+      return false;
+    }
+    String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
+    List<StripeData> slices = cachedData.getData();
+    if (slices.isEmpty()) return false;
+    long uncachedPrefixEnd = slices.get(0).getKnownTornStart(),
+        uncachedSuffixStart = slices.get(slices.size() - 1).getLastEnd();
+    Ref<Integer> stripeIx = Ref.from(0);
+    if (uncachedPrefixEnd > split.getStart()) {
+      // TODO: can we merge neighboring splits? So we don't init so many readers.
+      FileSplit sliceSplit = new FileSplit(split.getPath(), split.getStart(),
+          uncachedPrefixEnd - split.getStart(), hosts, inMemoryHosts);
+      if (!processOneFileSplit(sliceSplit, startTime, stripeIx, null)) return null;
+    }
+    while (!slices.isEmpty()) {
+      StripeData slice = slices.get(0);
+      long start = slice.getKnownTornStart();
+      long len = slice.getLastStart() - start; // Will also read the last row.
+      FileSplit sliceSplit = new FileSplit(split.getPath(), start, len, hosts, inMemoryHosts);
+      if (!processOneFileSplit(sliceSplit, startTime, stripeIx, slice)) return null;
+    }
+    boolean isUnfortunate = false;
+    if (uncachedSuffixStart == endOfSplit) {
+      // This is rather obscure. The end of last row cached is precisely at the split end offset.
+      // If the split is in the middle of the file, LRR would read one more row after that,
+      // therefore as unfortunate as it is, we have to do a one-row read. However, for that to
+      // have happened, someone should have supplied a split that ends inside the last row, i.e.
+      // a few bytes earlier than the current split, which is pretty unlikely. What is more likely
+      // is that the split, and the last row, both end at the end of file. Check for this.
+      long size =  split.getPath().getFileSystem(conf).getFileStatus(split.getPath()).getLen();
+      isUnfortunate = size > endOfSplit;
+      if (isUnfortunate) {
+        LOG.info("TODO# one-row mismatch at the end of split " + split.getPath() + " at "
+            + endOfSplit + "; file size is " + size);
+      }
+    }
+
+    if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
+      // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+      FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
+          endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
+      if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
+    }
+    return true;
+  }
+
+  public boolean processOneFileSplit(FileSplit split, long startTime,
+      Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
+    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+    LlapIoImpl.LOG.info("TODO# Processing one split {" + split.getPath() + ", "
+        + split.getStart() + ", " + split.getLength() + "}; cache data " + slice);
+    boolean[] splitIncludes = writerIncludes;
+    boolean hasAllData = false;
+    if (cacheEncodings != null) {
+      hasAllData = true;
+      splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+      for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+        if (!splitIncludes[colIx]) continue;
+        assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
+        if (cacheEncodings[colIx] != null) {
+          splitIncludes[colIx] = false;
+        } else {
+          hasAllData = false;
+        }
+      }
+    }
+    LOG.info("TODO# includes accounting for cached data: before " + DebugUtils.toString(
+        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
+
+    // We have 3 cases here:
+    // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
+    // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
+    // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
+    if (!hasAllData) {
+      // This initializes cacheWriter with data.
+      readSplitFromFile(split, splitIncludes, slice);
+      assert cacheWriter != null;
+    }
+    if (slice != null) {
+      // If we had a cache range already, it should not have been split.
+      assert cacheWriter == null || cacheWriter.stripes.size() == 1;
+      CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+      boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
+      ++stripeIxRef.value;
+      return result;
+    } else {
+      for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+        if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+          return false;
+        }
+        ++stripeIxRef.value;
+      }
+      return true;
+    }
+  }
+
+  private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
+      int stripeIx, StripeData slice, long startTime) throws IOException {
+    String sliceStr = slice == null ? "null" : slice.toCoordinateString();
+    LOG.info("TODO# processing slice #" + stripeIx + " " + sliceStr + "; has"
+        + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
+        + " disk data");
+
+    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
+    LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
+    long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+    TextStripeMetadata metadata = new TextStripeMetadata(stripeIx);
+    StripeData sliceToCache = null;
+    boolean hasAllData = csd == null;
+    if (!hasAllData) {
+      if (slice == null) {
+        sliceToCache = new StripeData(
+            csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
+            csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
+      } else {
+        if (csd.rowCount != slice.getRowCount()) {
+          throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
+              + slice.getRowCount() + " from " + csd + " and " + slice);
+        }
+        if (csd.encodings.size() != slice.getEncodings().length) {
+          throw new IOException("Column count mismatch; disk " + csd.encodings.size()
+              + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
+        }
+        LOG.info("TODO# creating slice to cache in addition to an existing slice "
+            + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
+        sliceToCache = StripeData.duplicateForResults(slice);
+        for (int i = 0; i < csd.encodings.size(); ++i) {
+          sliceToCache.getEncodings()[i] = csd.encodings.get(i);
+        }
+        sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
+      }
+      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
+      metadata.setRowCount(csd.rowCount);
+    } else {
+      assert cacheWriter == null;
+      metadata.setEncodings(Lists.newArrayList(cacheEncodings));
+      metadata.setRowCount(cacheRowCount);
+    }
+    LOG.info("TODO# derived stripe metadata for this split is " + metadata);
+    consumer.setStripeMetadata(metadata);
+
+    OrcEncodedColumnBatch ecb = ECB_POOL.take();
+    ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+    for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+      if (!writerIncludes[colIx]) continue;
+      ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+      if (!hasAllData && splitIncludes[colIx]) {
+        // The column has been read from disk.
+        List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
+        LOG.info("TODO# processing streams for column " + colIx + ": " + streams);
+        LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+            = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+        if (streams == null) continue; // Struct column, such as root?
+        Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+        while (iter.hasNext()) {
+          CacheWriter.CacheStreamData stream = iter.next();
+          if (stream.isSuppressed) {
+            LOG.info("TODO# removing a suppressed stream " + stream.name);
+            iter.remove();
+            discardUncachedBuffers(stream.data);
+            continue;
+          }
+          // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+          ColumnStreamData cb = CSD_POOL.take();
+          cb.incRef();
+          int streamIx = stream.name.getKind().getNumber();
+          cb.setCacheBuffers(stream.data);
+          // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+          newCacheDataForCol[streamIx] = stream.data.toArray(
+              new LlapDataBuffer[stream.data.size()]);
+          ecb.setStreamData(colIx, streamIx, cb);
+        }
+      } else {
+        // The column has been obtained from cache.
+        LlapDataBuffer[][] colData = cacheData[colIx];
+        LOG.info("TODO# processing cache data for column " + colIx + ": " + SerDeLowLevelCacheImpl.toString(colData));
+        for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+          if (colData[streamIx] == null) continue;
+          ColumnStreamData cb = CSD_POOL.take();
+          cb.incRef();
+          cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+          ecb.setStreamData(colIx, streamIx, cb);
+        }
+      }
+    }
+    if (processStop()) {
+      recordReaderTime(startTime);
+      return false;
+    }
+    // Note: we cache slices one by one since we need to lock them before sending to consumer.
+    //       We could lock here, then cache them together, then unlock here and in return,
+    //       but for now just rely on the cache put to lock them before we send them over.
+    LOG.info("TODO# Data to cache from the read " + sliceToCache);
+    cacheFileData(sliceToCache);
+    return sendEcbToConsumer(ecb, slice != null, csd);
+  }
+
+  private void discardUncachedBuffers(List<MemoryBuffer> list) {
+    for (MemoryBuffer buffer : list) {
+      boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
+      assert isInvalidated;
+      bufferManager.getAllocator().deallocate(buffer);
+    }
+  }
+
+  private static List<ColumnEncoding> combineCacheAndWriterEncodings(
+      ColumnEncoding[] cacheEncodings, List<ColumnEncoding> writerEncodings) throws IOException {
+    // TODO: refactor with cache impl? it has the same merge logic
+    if (cacheEncodings == null) {
+      return new ArrayList<>(writerEncodings);
+    }
+    if (cacheEncodings.length != writerEncodings.size()) {
+      throw new IOException("Incompatible encoding lengths: "
+          + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+    }
+    ColumnEncoding[] combinedEncodings = Arrays.copyOf(cacheEncodings, cacheEncodings.length);
+    for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+      ColumnEncoding newEncoding = writerEncodings.get(colIx);
+      if (newEncoding == null) continue;
+      if (combinedEncodings[colIx] != null && !newEncoding.equals(combinedEncodings[colIx])) {
+        throw new IOException("Incompatible encodings at " + colIx + ": "
+            + Arrays.toString(cacheEncodings) + " vs " + writerEncodings);
+      }
+      combinedEncodings[colIx] = newEncoding;
+    }
+    return Lists.newArrayList(combinedEncodings);
+  }
+
+
+  public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
+      throws IOException {
+    boolean maySplitTheSplit = slice == null;
+    ReaderWithOffsets offsetReader = null;
+    @SuppressWarnings("rawtypes")
+    RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+    try {
+      LOG.info("TODO# using " + sourceReader.getClass().getSimpleName() + " to read data");
+      // TODO# need a more general approach to this. At least, need to factor this out and add configs.
+      if (sourceReader instanceof LineRecordReader) {
+        offsetReader = LineRrOffsetReader.create((LineRecordReader)sourceReader);
+        maySplitTheSplit = maySplitTheSplit && offsetReader != null;
+        sourceReader = null;
+      } else {
+        offsetReader = new PassThruOffsetReader(sourceReader);
+        sourceReader = null;
+      }
+      ObjectInspector sourceOi;
+      try {
+        sourceOi = sourceSerDe.getObjectInspector();
+      } catch (SerDeException e) {
+        throw new IOException(e);
+      }
+
+      // TODO: ideally, we want to transform the rows to only have the included columns, and
+      //       only write those to the writer, with modified schema; then map back to full set later.
+      WriterOptions opts = OrcFile.writerOptions(conf)
+          .stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+          .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
+          .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
+          .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi);
+
+
+      // Column IDs are only used to verify eventual writer includes.
+      cacheWriter = new CacheWriter(bufferManager, allocSize, columnIds, splitIncludes);
+      orcWriter = new WriterImpl(cacheWriter, null, opts);
+      int rowsPerSlice = 0;
+      long currentKnownTornStart = split.getStart();
+      long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+      while (offsetReader.next()) {
+        Writable value = offsetReader.getCurrentValue();
+        lastStartOffset = offsetReader.getCurrentRowStartOffset();
+        if (firstStartOffset == Long.MIN_VALUE) {
+          firstStartOffset = lastStartOffset;
+        }
+        Object row = null;
+        try {
+          row = sourceSerDe.deserialize(value);
+        } catch (SerDeException e) {
+          throw new IOException(e);
+        }
+        orcWriter.addRow(row);
+        if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+          long fileOffset = offsetReader.getCurrentFileOffset();
+          // Must support offsets to be able to split.
+          if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+            throw new AssertionError("Unable to get offsets from "
+                + offsetReader.getClass().getSimpleName());
+          }
+          cacheWriter.setCurrentStripeOffsets(
+              currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+          // Split starting at row start will not read that row.
+          currentKnownTornStart = lastStartOffset;
+          // Row offsets will be determined from the reader (we could set the first from last).
+          lastStartOffset = Long.MIN_VALUE;
+          firstStartOffset = Long.MIN_VALUE;
+          rowsPerSlice = 0;
+          orcWriter.writeIntermediateFooter();
+        }
+      }
+      if (rowsPerSlice > 0) {
+        long fileOffset = offsetReader.getCurrentFileOffset();
+        if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+          // The reader doesn't support offsets.
+          // TODO## Dbl check if these shennanigans are correct wrt cache matching.
+          //       We want to match the exact same splits, and not match anything else ever.
+          //       Perhaps we should just add a flag that would allow us to match exactly.
+          // If cached split was starting at row start, that row would be skipped, so +1
+          firstStartOffset = split.getStart() + 1;
+          // Last row starting at the end of the split would be read.
+          lastStartOffset = split.getStart() + split.getLength();
+          // However, it must end after the split end, otherwise the next one would have been read.
+          fileOffset = lastStartOffset + 1;
+          LOG.info("TODO# setting fake cache offsets based on split offsets - 'first row' at "
+              + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+        }
+        cacheWriter.setCurrentStripeOffsets(
+            currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      }
+      // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
+      orcWriter.close();
+      orcWriter = null;
+    } finally {
+      // We don't need the source reader anymore.
+      if (offsetReader != null) {
+        try {
+          offsetReader.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close source reader", ex);
+        }
+      } else {
+        assert sourceReader != null;
+        try {
+          sourceReader.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close source reader", ex);
+        }
+      }
+    }
+  }
+
+  private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
+    SplitLocationInfo[] locInfo = split.getLocationInfo();
+    if (locInfo == null) return new String[0];
+    List<String> hosts = null; // TODO: most of the time, there's no in-memory. Use an array?
+    for (int i = 0; i < locInfo.length; i++) {
+      if (locInfo[i].isInMemory() != isInMemory) continue;
+      if (hosts == null) {
+        hosts = new ArrayList<>();
+      }
+      hosts.add(locInfo[i].getLocation());
+    }
+    if (hosts == null) return new String[0];
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
+  private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
+      boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+    if (ecb == null) { // This basically means stop has been called.
+      cleanupReaders();
+      return false;
+    }
+    LOG.info("TODO# Sending over the ecb");
+    try {
+      consumer.consumeData(ecb);
+    } catch (Throwable ex) {
+      LOG.info("TODO# threw", ex);
+      consumer.setError(ex); // TODO## this is wrong, it shouldn't throw
+    }
+    if (hasCachedSlice) {
+      cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
+    }
+    if (writerData != null) {
+      writerData.colStreams.clear();
+    }
+    return true;
+  }
+
+
+  private void cleanupReaders() {
+    if (orcWriter != null) {
+      try {
+        orcWriter.close();
+        orcWriter = null;
+      } catch (Exception ex) {
+        LOG.error("Failed to close ORC writer", ex);
+      }
+    }
+    if (cacheWriter != null) {
+      try {
+        cacheWriter.discardData();
+        cacheWriter = null;
+      } catch (Exception ex) {
+        LOG.error("Failed to close cache writer", ex);
+      }
+    }
+  }
+
+  private void recordReaderTime(long startTime) {
+    counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
+  }
+
+  private boolean processStop() {
+    if (!isStopped) return false;
+    LOG.info("SerDe-based data reader is stopping");
+    cleanupReaders();
+    return true;
+  }
+
+  private static Object determineFileId(FileSystem fs, FileSplit split,
+      boolean allowSynthetic) throws IOException {
+    /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
+      Object fileKey = ((OrcSplit)split).getFileKey();
+      if (fileKey != null) return fileKey; */
+    LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
+    return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
+  }
+
+  // TODO: move to a superclass?
+  @Override
+  public void returnData(OrcEncodedColumnBatch ecb) {
+    for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
+      if (!ecb.hasData(colIx)) continue;
+      ColumnStreamData[] datas = ecb.getColumnData(colIx);
+      for (ColumnStreamData data : datas) {
+        if (data == null || data.decRef() != 0) continue;
+        if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+          for (MemoryBuffer buf : data.getCacheBuffers()) {
+            LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf);
+          }
+        }
+        bufferManager.decRefBuffers(data.getCacheBuffers());
+        CSD_POOL.offer(data);
+      }
+    }
+    // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+    ECB_POOL.offer(ecb);
+  }
+
+  public TezCounters getTezCounters() {
+    return counters.getTezCounters();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
new file mode 100644
index 0000000..040f1a7
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerFileMetadata.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcProto.Type;
+import org.apache.orc.TypeDescription;
+
+public interface ConsumerFileMetadata {
+  int getStripeCount();
+  CompressionKind getCompressionKind();
+  List<Type> getTypes();
+  TypeDescription getSchema();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
new file mode 100644
index 0000000..1e28f5f
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ConsumerStripeMetadata.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.RowIndexEntry;
+
+public interface ConsumerStripeMetadata {
+  int getStripeIx();
+  long getRowCount();
+  List<ColumnEncoding> getEncodings();
+  String getWriterTimezone();
+  RowIndexEntry getRowIndexEntry(int colIx, int rgIx); // TODO: remove?
+  RowIndex[] getRowIndexes();
+  boolean supportsRowIndexes();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 70cba05..2c7a234 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -41,7 +41,8 @@ import org.apache.orc.impl.ReaderImpl;
  * of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
  * or instead use protobuf structs everywhere instead of the mix of things like now.
  */
-public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMetadata {
+public final class OrcFileMetadata extends LlapCacheableBuffer
+    implements FileMetadata, ConsumerFileMetadata {
   private final List<StripeInformation> stripes;
   private final List<Integer> versionList;
   private final List<OrcProto.StripeStatistics> stripeStats;
@@ -225,6 +226,11 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
     return fileStats;
   }
 
+  @Override
+  public int getStripeCount() {
+    return stripes.size();
+  }
+
   public TypeDescription getSchema() {
     return OrcUtils.convertTypeFromProtobuf(this.types, 0);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 3f4f43b..73a1721 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.MemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 
 public class OrcMetadataCache {
   private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
@@ -116,7 +117,6 @@ public class OrcMetadataCache {
     return touchOnGet(metadata.get(fileKey));
   }
 
-
   private <T extends LlapCacheableBuffer> T touchOnGet(T result) {
     if (result != null) {
       policy.notifyLock(result);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 6f0b9ff..5ef1678 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -32,10 +32,11 @@ import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.RowIndexEntry;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.OrcIndex;
 
-public class OrcStripeMetadata extends LlapCacheableBuffer {
+public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata {
   private final OrcBatchKey stripeKey;
   private final List<OrcProto.ColumnEncoding> encodings;
   private final List<OrcProto.Stream> streams;
@@ -172,4 +173,14 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
   public void resetRowIndex() {
     rowIndex = null;
   }
+
+  @Override
+  public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+    return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx);
+  }
+
+  @Override
+  public boolean supportsRowIndexes() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index dc83b9c..4f02926 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.orc.OrcProto.Type.Builder;
 import org.apache.orc.impl.ReaderImpl;
 
 import com.google.common.collect.Lists;
@@ -538,4 +539,86 @@ public class OrcUtils {
     }
     return result;
   }
+
+  public static List<TypeDescription> setTypeBuilderFromSchema(
+      OrcProto.Type.Builder type, TypeDescription schema) {
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(OrcProto.Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
+        break;
+      case LIST:
+        type.setKind(OrcProto.Type.Kind.LIST);
+        type.addSubtypes(children.get(0).getId());
+        break;
+      case MAP:
+        type.setKind(OrcProto.Type.Kind.MAP);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      case STRUCT:
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        for(String field: schema.getFieldNames()) {
+          type.addFieldNames(field);
+        }
+        break;
+      case UNION:
+        type.setKind(OrcProto.Type.Kind.UNION);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown category: " +
+          schema.getCategory());
+    }
+    return children;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
index 83742e4..5ba1b9b 100644
--- a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -19,7 +19,6 @@
 package org.apache.orc.impl;
 
 import java.io.IOException;
-import java.util.EnumSet;
 
 import org.apache.orc.OrcProto.BloomFilterIndex;
 import org.apache.orc.OrcProto.Footer;

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 975804b..9433e54 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -232,6 +232,11 @@ public class RecordReaderImpl implements RecordReader {
     public long getNext() {
       return entry.getPositions(index++);
     }
+
+    @Override
+    public String toString() {
+      return "{" + entry.getPositionsList() + "; " + index + "}";
+    }
   }
 
   public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 484209b..4bb51c3 100644
--- a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1223,6 +1223,7 @@ public class TreeReaderFactory {
     }
   }
 
+  private static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(TreeReaderFactory.class);
   // This class collects together very similar methods for reading an ORC vector of byte arrays and
   // creating the BytesColumnVector.
   //

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b17fb41..e0fcae7 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -125,10 +125,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   public WriterImpl(FileSystem fs,
                     Path path,
                     OrcFile.WriterOptions opts) throws IOException {
-    this.path = path;
+    this(new PhysicalFsWriter(fs, path, opts.getSchema().getMaximumId() + 1, opts), path, opts);
+  }
+
+  public WriterImpl(PhysicalWriter writer,
+                    Path pathForMem,
+                    OrcFile.WriterOptions opts) throws IOException {
+    this.physWriter = writer;
+    this.path = pathForMem;
     this.conf = opts.getConfiguration();
-    this.callback = opts.getCallback();
     this.schema = opts.getSchema();
+    this.callback = opts.getCallback();
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -153,8 +160,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
     }
     this.bloomFilterFpp = opts.getBloomFilterFpp();
-    int numColumns = schema.getMaximumId() + 1;
-    physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
     treeWriter = createTreeWriter(schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -162,7 +167,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, opts.getStripeSize(), this);
+    if (path != null) {
+      memoryManager.addWriter(path, opts.getStripeSize(), this);
+    }
   }
 
   @Override
@@ -2112,83 +2119,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private static void writeTypes(OrcProto.Footer.Builder builder,
                                  TypeDescription schema) {
     OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    List<TypeDescription> children = schema.getChildren();
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        type.setKind(OrcProto.Type.Kind.BOOLEAN);
-        break;
-      case BYTE:
-        type.setKind(OrcProto.Type.Kind.BYTE);
-        break;
-      case SHORT:
-        type.setKind(OrcProto.Type.Kind.SHORT);
-        break;
-      case INT:
-        type.setKind(OrcProto.Type.Kind.INT);
-        break;
-      case LONG:
-        type.setKind(OrcProto.Type.Kind.LONG);
-        break;
-      case FLOAT:
-        type.setKind(OrcProto.Type.Kind.FLOAT);
-        break;
-      case DOUBLE:
-        type.setKind(OrcProto.Type.Kind.DOUBLE);
-        break;
-      case STRING:
-        type.setKind(OrcProto.Type.Kind.STRING);
-        break;
-      case CHAR:
-        type.setKind(OrcProto.Type.Kind.CHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case VARCHAR:
-        type.setKind(OrcProto.Type.Kind.VARCHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case BINARY:
-        type.setKind(OrcProto.Type.Kind.BINARY);
-        break;
-      case TIMESTAMP:
-        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-        break;
-      case DATE:
-        type.setKind(OrcProto.Type.Kind.DATE);
-        break;
-      case DECIMAL:
-        type.setKind(OrcProto.Type.Kind.DECIMAL);
-        type.setPrecision(schema.getPrecision());
-        type.setScale(schema.getScale());
-        break;
-      case LIST:
-        type.setKind(OrcProto.Type.Kind.LIST);
-        type.addSubtypes(children.get(0).getId());
-        break;
-      case MAP:
-        type.setKind(OrcProto.Type.Kind.MAP);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      case STRUCT:
-        type.setKind(OrcProto.Type.Kind.STRUCT);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        for(String field: schema.getFieldNames()) {
-          type.addFieldNames(field);
-        }
-        break;
-      case UNION:
-        type.setKind(OrcProto.Type.Kind.UNION);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown category: " +
-          schema.getCategory());
-    }
+    List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema);
     builder.addTypes(type);
     if (children != null) {
       for(TypeDescription child: children) {
@@ -2380,7 +2311,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       callback.preFooterWrite(callbackContext);
     }
     // remove us from the memory manager so that we don't get any callbacks
-    memoryManager.removeWriter(path);
+    if (path != null) {
+      memoryManager.removeWriter(path);
+    }
     // actually close the file
     flushStripe();
     writeMetadata();

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 3d81e43..b9c39de 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.hive.llap;
  */
 public class DebugUtils {
   public static String toString(boolean[] a) {
+    if (a == null) return "null";
     StringBuilder b = new StringBuilder();
     b.append('[');
     for (int i = 0; i < a.length; ++i) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 601ad08..004bb2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -225,7 +225,7 @@ public class FetchOperator implements Serializable {
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return HiveInputFormat.wrapForLlap(format, conf);
+    return format;
   }
 
   private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 37e4b9b..46270bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -327,7 +327,7 @@ public final class Utilities {
     return null;
   }
 
-  public static BaseWork getMergeWork(JobConf jconf) {
+  public static BaseWork getMergeWork(Configuration jconf) {
     if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
         || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
       return null;
@@ -335,7 +335,7 @@ public final class Utilities {
     return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
   }
 
-  public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+  public static BaseWork getMergeWork(Configuration jconf, String prefix) {
     if (prefix == null || prefix.isEmpty()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index ba25573..de36f2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -19,11 +19,20 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileSplit;
@@ -42,6 +51,9 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit;
  */
 public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
     extends HiveContextAwareRecordReader<K, V> {
+  private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class);
+
+  private LinkedHashMap<Path, PartitionDesc> pathToPartInfo;
 
   public CombineHiveRecordReader(InputSplit split, Configuration conf,
       Reporter reporter, Integer partition, RecordReader preReader) throws IOException {
@@ -57,8 +69,27 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
       throw new IOException("CombineHiveRecordReader: class not found "
           + inputFormatClassName);
     }
-    InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
-        inputFormatClass, jobConf);
+    InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+    try {
+      // TODO: refactor this out
+      if (pathToPartInfo == null) {
+        MapWork mrwork;
+        if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+          mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+          if (mrwork == null) {
+            mrwork = Utilities.getMapWork(jobConf);
+          }
+        } else {
+          mrwork = Utilities.getMapWork(jobConf);
+        }
+        pathToPartInfo = mrwork.getPathToPartitionInfo();
+      }
+
+      PartitionDesc part = extractSinglePartSpec(hsplit);
+      inputFormat = HiveInputFormat.wrapForLlap(inputFormat, jobConf, part);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
 
     // create a split for the given partition
     FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
@@ -79,6 +110,26 @@ public class CombineHiveRecordReader<K extends WritableComparable, V extends Wri
 
   }
 
+  private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws IOException {
+    PartitionDesc part = null;
+    Map<Map<Path,PartitionDesc>, Map<Path,PartitionDesc>> cache = new HashMap<>();
+    for (Path path : hsplit.getPaths()) {
+      PartitionDesc otherPart = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+          pathToPartInfo, path, cache);
+      LOG.debug("Found spec for " + path + " " + otherPart + " from " + pathToPartInfo);
+      if (part == null) {
+        part = otherPart;
+      } else if (otherPart != part) { // Assume we should have the exact same object.
+        // TODO: we could also compare the schema and SerDe, and pass only those to the call
+        //       instead; most of the time these would be the same and LLAP IO can handle that.
+        LOG.warn("Multiple partitions found; not going to pass a part spec to LLAP IO: {"
+            + part.getPartSpec() + "} and {" + otherPart.getPartSpec() + "}");
+        return null;
+      }
+    }
+    return part;
+  }
+
   @Override
   public void doClose() throws IOException {
     recordReader.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 94fcd60..f8391e0 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -202,7 +204,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static InputFormat<WritableComparable, Writable> wrapForLlap(
-      InputFormat<WritableComparable, Writable> inputFormat, Configuration conf) {
+      InputFormat<WritableComparable, Writable> inputFormat, Configuration conf,
+      PartitionDesc part) throws HiveException {
     if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
       return inputFormat; // LLAP not enabled, no-op.
     }
@@ -227,7 +230,20 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       LOG.info("Not using LLAP IO because it is not initialized");
       return inputFormat;
     }
-    return castInputFormat(llapIo.getInputFormat(inputFormat));
+    LlapWrappableInputFormatInterface llapIf = (LlapWrappableInputFormatInterface)inputFormat;
+    Deserializer serde = null;
+    if (llapIf.isSerdeBased()) {
+      if (part == null) {
+        LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF", new Exception());
+        return inputFormat;
+      }
+      try {
+        serde = part.getDeserializer(conf);
+      } catch (Exception e) {
+        throw new HiveException("Error creating SerDe for LLAP IO", e);
+      }
+    }
+    return castInputFormat(llapIo.getInputFormat(inputFormat, serde));
   }
 
 
@@ -248,7 +264,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     return (InputFormat<T, U>)from;
   }
 
-
+  /** NOTE: this no longer wraps the IF for LLAP. Call wrapForLlap manually if needed. */
   public static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
     Class inputFormatClass, JobConf job) throws IOException {
     InputFormat<WritableComparable, Writable> instance = inputFormats.get(inputFormatClass);
@@ -266,7 +282,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
             + inputFormatClass.getName() + " as specified in mapredWork!", e);
       }
     }
-    return wrapForLlap(instance, job);
+    return instance;
   }
 
   public RecordReader getRecordReader(InputSplit split, JobConf job,
@@ -287,15 +303,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
 
     boolean nonNative = false;
-    PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath());
+    PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+        pathToPartitionInfo, hsplit.getPath(), null);
+    LOG.debug("Found spec for " + hsplit.getPath() + " " + part + " from " + pathToPartitionInfo);
+
     if ((part != null) && (part.getTableDesc() != null)) {
       Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), job);
       nonNative = part.getTableDesc().isNonNative();
     }
 
-    pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(), nonNative);
+    Path splitPath = hsplit.getPath();
+    pushProjectionsAndFilters(job, inputFormatClass, splitPath, nonNative);
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+    try {
+      inputFormat = HiveInputFormat.wrapForLlap(inputFormat, job, part);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
     RecordReader innerReader = null;
     try {
       innerReader = inputFormat.getRecordReader(inputSplit, job, reporter);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
index 66e1f90..f168f3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapWrappableInputFormatInterface.java
@@ -18,5 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-/** Marker interface for LLAP; serves no other purpose. */
-public interface LlapWrappableInputFormatInterface {}
+/** Marker interface for LLAP IO. */
+public interface LlapWrappableInputFormatInterface {
+  boolean isSerdeBased();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 361901e..2a89c03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -332,6 +332,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                              List<Integer> included) {
 
     boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+    if (included == null) {
+      Arrays.fill(result, true);
+      return result;
+    }
     result[0] = true;
     List<TypeDescription> children = readerSchema.getChildren();
     for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
@@ -2482,4 +2486,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
     return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols);
   }
+
+
+  @Override
+  public boolean isSerdeBased() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 075c3b4..3e4ec2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -93,14 +93,28 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
              OrcFile.WriterOptions opts) throws IOException {
     super(fs, path, opts);
     this.inspector = opts.getInspector();
-    internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.fields = initializeFieldsFromOi(inspector);
+  }
+
+  public WriterImpl(PhysicalWriter writer,
+                    Path pathForMem,
+                    OrcFile.WriterOptions opts) throws IOException {
+    super(writer, pathForMem, opts);
+    this.inspector = opts.getInspector();
+    this.internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    this.fields = initializeFieldsFromOi(inspector);
+  }
+
+  private static StructField[] initializeFieldsFromOi(ObjectInspector inspector) {
     if (inspector instanceof StructObjectInspector) {
       List<? extends StructField> fieldList =
           ((StructObjectInspector) inspector).getAllStructFieldRefs();
-      fields = new StructField[fieldList.size()];
+      StructField[] fields = new StructField[fieldList.size()];
       fieldList.toArray(fields);
+      return fields;
     } else {
-      fields = null;
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index 2325140..8857d3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -83,4 +83,8 @@ public class CacheChunk extends DiskRangeList {
   public void reset() {
     init(null, -1, -1);
   }
+
+  public void adjustEnd(long l) {
+    this.end += l;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index d5f5f9d..0dba1a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -1775,7 +1775,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
           " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
           columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
-          data != null, dictionary != null, lengths != null, secondary != null, tz);
+          data, dictionary != null, lengths != null, secondary != null, tz);
     }
     switch (schema.getCategory()) {
       case BINARY:


[4/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/682a3c7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/682a3c7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/682a3c7b

Branch: refs/heads/master-15147
Commit: 682a3c7b46aec9e43275551698fa6ba9c7ac5d7c
Parents: 7f46c8d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 16 18:57:28 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 16 18:57:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    7 +
 .../apache/hadoop/hive/llap/io/api/LlapIo.java  |    3 +-
 .../llap/IncrementalObjectSizeEstimator.java    |    2 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |    3 +-
 .../hive/llap/cache/EvictionDispatcher.java     |   11 +-
 .../hadoop/hive/llap/cache/FileCache.java       |  107 ++
 .../hive/llap/cache/FileCacheCleanupThread.java |  104 ++
 .../hadoop/hive/llap/cache/LlapDataBuffer.java  |    7 +
 .../hive/llap/cache/LowLevelCacheImpl.java      |  218 +--
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  716 ++++++++++
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  348 +----
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   26 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java |  335 +++++
 .../llap/io/decode/ColumnVectorProducer.java    |   11 +-
 .../llap/io/decode/EncodedDataConsumer.java     |    7 +
 .../io/decode/GenericColumnVectorProducer.java  |  201 +++
 .../llap/io/decode/LlapTextInputFormat.java     |   33 +
 .../llap/io/decode/OrcColumnVectorProducer.java |   12 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   95 +-
 .../hive/llap/io/decode/ReadPipeline.java       |    2 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java | 1248 ++++++++++++++++++
 .../llap/io/metadata/ConsumerFileMetadata.java  |   31 +
 .../io/metadata/ConsumerStripeMetadata.java     |   35 +
 .../hive/llap/io/metadata/OrcFileMetadata.java  |    8 +-
 .../hive/llap/io/metadata/OrcMetadataCache.java |    2 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |   13 +-
 orc/src/java/org/apache/orc/OrcUtils.java       |   83 ++
 .../org/apache/orc/impl/PhysicalWriter.java     |    1 -
 .../org/apache/orc/impl/RecordReaderImpl.java   |    5 +
 .../org/apache/orc/impl/TreeReaderFactory.java  |    1 +
 .../java/org/apache/orc/impl/WriterImpl.java    |   99 +-
 .../org/apache/hadoop/hive/llap/DebugUtils.java |    1 +
 .../hadoop/hive/ql/exec/FetchOperator.java      |    2 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |    4 +-
 .../hive/ql/io/CombineHiveRecordReader.java     |   55 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   37 +-
 .../io/LlapWrappableInputFormatInterface.java   |    6 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   10 +
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |   20 +-
 .../hive/ql/io/orc/encoded/CacheChunk.java      |    4 +
 .../orc/encoded/EncodedTreeReaderFactory.java   |    2 +-
 .../hive/ql/io/orc/encoded/StreamUtils.java     |   13 +-
 .../ql/io/rcfile/stats/PartialScanMapper.java   |    5 +-
 ql/src/test/queries/clientpositive/llap_text.q  |   62 +
 .../test/results/clientpositive/llap_text.q.out |  502 +++++++
 .../hadoop/hive/common/io/DiskRangeList.java    |    6 +
 .../common/io/encoded/EncodedColumnBatch.java   |   17 +
 47 files changed, 3898 insertions(+), 622 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b4e89b0..9806105 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2900,6 +2900,13 @@ public class HiveConf extends Configuration {
     LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
         "Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
         "cases of file overwrites. This is supported on HDFS."),
+    LLAP_IO_ENCODE_ALLOC_SIZE("hive.llap.io.encode.alloc.size", "256Kb", new SizeValidator(),
+        "Allocation size for the buffers used to cache encoded data from non-ORC files. Must\n" +
+        "be a power of two between " + LLAP_ALLOCATOR_MIN_ALLOC + " and\n" +
+        LLAP_ALLOCATOR_MAX_ALLOC + "."),
+    LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
+        "Row count to use to separate cache slices when caching encoded data from row-based\n" +
+        "inputs into LLAP cache."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index d82757f..e5ab601 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 
 public interface LlapIo<T> {
-  InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat);
+  InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index 3efbcc2..ff6e7ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
  * during the actual estimation. TODO: clean up
  */
 public class IncrementalObjectSizeEstimator {
-  private static final JavaDataModel memoryModel = JavaDataModel.get();
+  public static final JavaDataModel memoryModel = JavaDataModel.get();
   private enum FieldType { PRIMITIVE_ARRAY, OBJECT_ARRAY, COLLECTION, MAP, OTHER };
 
   public static HashMap<Class<?>, ObjectEstimator> createEstimators(Object rootObj) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index d9d407d..8d7f0d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -611,7 +611,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
       assert data != null;
       int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
           freeListIx = freeListFromHeader(headers[headerIx]);
-      assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2);
+      assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
+          : buffer.allocSize + " " + freeListIx;
       while (true) {
         FreeList freeList = freeLists[freeListIx];
         int bHeaderIx = headerIx ^ (1 << freeListIx);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index b6fd3e3..2d3197c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -27,11 +27,16 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
  */
 public final class EvictionDispatcher implements EvictionListener {
   private final LowLevelCache dataCache;
+  private final SerDeLowLevelCacheImpl serdeCache;
   private final OrcMetadataCache metadataCache;
+  private final EvictionAwareAllocator allocator;
 
-  public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
+  public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
+      OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
     this.dataCache = dataCache;
     this.metadataCache = metadataCache;
+    this.serdeCache = serdeCache;
+    this.allocator = allocator;
   }
 
   @Override
@@ -40,7 +45,11 @@ public final class EvictionDispatcher implements EvictionListener {
   }
 
   public void notifyEvicted(LlapDataBuffer buffer) {
+    // Note: we don't know which cache this is from, so we notify both. They can noop if they
+    //       want to find the buffer in their structures and can't.
     dataCache.notifyEvicted(buffer);
+    serdeCache.notifyEvicted(buffer);
+    allocator.deallocateEvicted(buffer);
   }
 
   public void notifyEvicted(OrcFileMetadata buffer) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
new file mode 100644
index 0000000..44b71c9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Function;
+
+/** Class used for a single file in LowLevelCacheImpl, etc. */
+class FileCache<T> {
+  private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+  private final T cache;
+  private final AtomicInteger refCount = new AtomicInteger(0);
+
+  private FileCache(T value) {
+    this.cache = value;
+  }
+
+  public T getCache() {
+    return cache;
+  }
+
+  boolean incRef() {
+    while (true) {
+      int value = refCount.get();
+      if (value == EVICTED_REFCOUNT) return false;
+      if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+      assert value >= 0;
+      if (refCount.compareAndSet(value, value + 1)) return true;
+    }
+  }
+
+  void decRef() {
+    int value = refCount.decrementAndGet();
+    if (value < 0) {
+      throw new AssertionError("Unexpected refCount " + value);
+    }
+  }
+
+  boolean startEvicting() {
+    while (true) {
+      int value = refCount.get();
+      if (value != 1) return false;
+      if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+    }
+  }
+
+  void commitEvicting() {
+    boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+    assert result;
+  }
+
+  void abortEvicting() {
+    boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+    assert result;
+  }
+
+  /**
+   * All this mess is necessary because we want to be able to remove sub-caches for fully
+   * evicted files. It may actually be better to have non-nested map with object keys?
+   */
+  public static <T> FileCache<T> getOrAddFileSubCache(
+      ConcurrentHashMap<Object, FileCache<T>> cache, Object fileKey,
+      Function<Void, T> createFunc) {
+    FileCache<T> newSubCache = null;
+    while (true) { // Overwhelmingly executes once.
+      FileCache<T> subCache = cache.get(fileKey);
+      if (subCache != null) {
+        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+        if (newSubCache == null) {
+          newSubCache = new FileCache<T>(createFunc.apply(null));
+          newSubCache.incRef();
+        }
+        // Found a stale value we cannot incRef; try to replace it with new value.
+        if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
+        continue; // Someone else replaced/removed a stale value, try again.
+      }
+      // No value found.
+      if (newSubCache == null) {
+        newSubCache = new FileCache<T>(createFunc.apply(null));
+        newSubCache.incRef();
+      }
+      FileCache<T> oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
+      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+      // Someone created one in parallel and then it went stale.
+      if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
+      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
new file mode 100644
index 0000000..17c7ee6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hive.common.util.Ref;
+
+/** Class used to slowly clean up a map of FileCache-s. */
+abstract class FileCacheCleanupThread<T> extends Thread {
+  private final long approxCleanupIntervalSec;
+  private final AtomicInteger newEvictions;
+  private final ConcurrentHashMap<Object, FileCache<T>> fileMap;
+
+  public FileCacheCleanupThread(String name, ConcurrentHashMap<Object, FileCache<T>> fileMap,
+      AtomicInteger newEvictions, long cleanupInterval) {
+    super(name);
+    this.fileMap = fileMap;
+    this.newEvictions = newEvictions;
+    this.approxCleanupIntervalSec = cleanupInterval;
+    setDaemon(true);
+    setPriority(1);
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        doOneCleanupRound();
+      } catch (InterruptedException ex) {
+        LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+        Thread.currentThread().interrupt();
+        break;
+      } catch (Throwable t) {
+        LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+        break;
+      }
+    }
+  }
+
+  private void doOneCleanupRound() throws InterruptedException {
+    while (true) {
+      int evictionsSinceLast = newEvictions.getAndSet(0);
+      if (evictionsSinceLast > 0) break;
+      synchronized (newEvictions) {
+        newEvictions.wait(10000);
+      }
+    }
+    // Duration is an estimate; if the size of the map changes, it can be very different.
+    long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
+    int leftToCheck = 0; // approximate
+    for (FileCache<T> fc : fileMap.values()) {
+      leftToCheck += getCacheSize(fc);
+    }
+    // Iterate thru all the filecaches. This is best-effort.
+    // If these super-long-lived iterators affect the map in some bad way,
+    // we'd need to sleep once per round instead.
+    Iterator<Map.Entry<Object, FileCache<T>>> iter = fileMap.entrySet().iterator();
+    Ref<Boolean> isPastEndTime = Ref.from(false);
+    while (iter.hasNext()) {
+      FileCache<T> fc = iter.next().getValue();
+      if (!fc.incRef()) {
+        throw new AssertionError("Something other than cleanup is removing elements from map");
+      }
+      leftToCheck = cleanUpOneFileCache(fc, leftToCheck, endTime, isPastEndTime);
+      if (getCacheSize(fc) > 0) {
+        fc.decRef();
+        continue;
+      }
+      // FileCache might be empty; see if we can remove it. "tryWriteLock"
+      if (!fc.startEvicting()) continue;
+      if (getCacheSize(fc) == 0) {
+        fc.commitEvicting();
+        iter.remove();
+      } else {
+        fc.abortEvicting();
+      }
+    }
+  }
+
+  protected abstract int getCacheSize(FileCache<T> fc);
+
+  protected abstract int cleanUpOneFileCache(FileCache<T> fc, int leftToCheck, long endTime,
+      Ref<Boolean> isPastEndTime) throws InterruptedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index d1a961c..7d5c101 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -139,4 +139,11 @@ public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryB
     int refCount = this.refCount.get();
     return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
   }
+
+  public static String toDataString(MemoryBuffer s) {
+    if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
+    byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
+    int i = (b < 0) ? -b : b;
+    return s + " (0x" + Integer.toHexString(i) + ")";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index ea458ca..72980ae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -32,33 +32,45 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 
 public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
-  private final EvictionAwareAllocator allocator;
+  private final Allocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
   private Thread cleanupThread = null;
-  private final ConcurrentHashMap<Object, FileCache> cache =
-      new ConcurrentHashMap<Object, FileCache>();
+  // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+  //       In fact, CSLM has slow single-threaded operation, and one file is probably often read
+  //       by just one (or few) threads, so a much more simple DS with locking might be better.
+  //       Let's use CSLM for now, since it's available.
+  private final ConcurrentHashMap<Object,
+      FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> cache = new ConcurrentHashMap<>();
   private final LowLevelCachePolicy cachePolicy;
   private final long cleanupInterval;
   private final LlapDaemonCacheMetrics metrics;
   private final boolean doAssumeGranularBlocks;
 
+  private static final Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>> CACHE_CTOR =
+      new Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>>() {
+        @Override
+        public ConcurrentSkipListMap<Long, LlapDataBuffer> apply(Void input) {
+          return new ConcurrentSkipListMap<>();
+        }
+      };
+
   public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
-      EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks) {
+      Allocator allocator, boolean doAssumeGranularBlocks) {
     this(metrics, cachePolicy, allocator, doAssumeGranularBlocks, DEFAULT_CLEANUP_INTERVAL);
   }
 
   @VisibleForTesting
   LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
-      EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
-
+      Allocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
     LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval);
     this.cachePolicy = cachePolicy;
     this.allocator = allocator;
@@ -69,7 +81,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   public void startThreads() {
     if (cleanupInterval < 0) return;
-    cleanupThread = new CleanupThread(cleanupInterval);
+    cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
     cleanupThread.start();
   }
 
@@ -78,7 +90,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
     if (ranges == null) return null;
     DiskRangeList prev = ranges.prev;
-    FileCache subCache = cache.get(fileKey);
+    FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache = cache.get(fileKey);
     if (subCache == null || !subCache.incRef()) {
       long totalMissed = ranges.getTotalLength();
       metrics.incrCacheRequestedBytes(totalMissed);
@@ -102,7 +114,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         metrics.incrCacheRequestedBytes(current.getLength());
         // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
         DiskRangeList next = current.next;
-        getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
+        getOverlappingRanges(baseOffset, current, subCache.getCache(), factory, gotAllData);
         current = next;
       }
     } finally {
@@ -234,7 +246,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
     long[] result = null;
     assert buffers.length == ranges.length;
-    FileCache subCache = getOrAddFileSubCache(fileKey);
+    FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
+        FileCache.getOrAddFileSubCache(cache, fileKey, CACHE_CTOR);
     try {
       for (int i = 0; i < ranges.length; ++i) {
         LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -247,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
         buffer.declaredCachedLength = ranges[i].getLength();
         while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
-          LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+          LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
           if (oldVal == null) {
             // Cached successfully, add to policy.
             cachePolicy.cache(buffer, priority);
@@ -287,7 +300,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
             break;
           }
           // We found some old value but couldn't incRef it; remove it.
-          subCache.cache.remove(offset, oldVal);
+          subCache.getCache().remove(offset, oldVal);
         }
       }
     } finally {
@@ -296,38 +309,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
     return result;
   }
 
-  /**
-   * All this mess is necessary because we want to be able to remove sub-caches for fully
-   * evicted files. It may actually be better to have non-nested map with object keys?
-   */
-  private FileCache getOrAddFileSubCache(Object fileKey) {
-    FileCache newSubCache = null;
-    while (true) { // Overwhelmingly executes once.
-      FileCache subCache = cache.get(fileKey);
-      if (subCache != null) {
-        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
-        if (newSubCache == null) {
-          newSubCache = new FileCache();
-          newSubCache.incRef();
-        }
-        // Found a stale value we cannot incRef; try to replace it with new value.
-        if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
-        continue; // Someone else replaced/removed a stale value, try again.
-      }
-      // No value found.
-      if (newSubCache == null) {
-        newSubCache = new FileCache();
-        newSubCache.incRef();
-      }
-      FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
-      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
-      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
-      // Someone created one in parallel and then it went stale.
-      if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
-      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
-    }
-  }
-
   private static int align64(int number) {
     return ((number + 63) & ~63);
   }
@@ -370,134 +351,44 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   @Override
   public final void notifyEvicted(MemoryBuffer buffer) {
-    allocator.deallocateEvicted(buffer);
     newEvictions.incrementAndGet();
   }
 
-  private static class FileCache {
-    private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
-    // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
-    //       In fact, CSLM has slow single-threaded operation, and one file is probably often read
-    //       by just one (or few) threads, so a much more simple DS with locking might be better.
-    //       Let's use CSLM for now, since it's available.
-    private final ConcurrentSkipListMap<Long, LlapDataBuffer> cache
-      = new ConcurrentSkipListMap<Long, LlapDataBuffer>();
-    private final AtomicInteger refCount = new AtomicInteger(0);
-
-    boolean incRef() {
-      while (true) {
-        int value = refCount.get();
-        if (value == EVICTED_REFCOUNT) return false;
-        if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
-        assert value >= 0;
-        if (refCount.compareAndSet(value, value + 1)) return true;
-      }
-    }
-
-    void decRef() {
-      int value = refCount.decrementAndGet();
-      if (value < 0) {
-        throw new AssertionError("Unexpected refCount " + value);
-      }
-    }
-
-    boolean startEvicting() {
-      while (true) {
-        int value = refCount.get();
-        if (value != 1) return false;
-        if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
-      }
-    }
-
-    void commitEvicting() {
-      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
-      assert result;
-    }
-
-    void abortEvicting() {
-      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
-      assert result;
-    }
-  }
-
-  private final class CleanupThread extends Thread {
-    private final long approxCleanupIntervalSec;
+  private static final class CleanupThread
+    extends FileCacheCleanupThread<ConcurrentSkipListMap<Long, LlapDataBuffer>> {
 
-    public CleanupThread(long cleanupInterval) {
-      super("Llap low level cache cleanup thread");
-      this.approxCleanupIntervalSec = cleanupInterval;
-      setDaemon(true);
-      setPriority(1);
+    public CleanupThread(ConcurrentHashMap<Object,
+        FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> fileMap,
+        AtomicInteger newEvictions, long cleanupInterval) {
+      super("Llap low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
     }
 
     @Override
-    public void run() {
-      while (true) {
-        try {
-          doOneCleanupRound();
-        } catch (InterruptedException ex) {
-          LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
-          Thread.currentThread().interrupt();
-          break;
-        } catch (Throwable t) {
-          LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
-          break;
-        }
-      }
+    protected int getCacheSize( FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc) {
+      return fc.getCache().size();
     }
 
-    private void doOneCleanupRound() throws InterruptedException {
-      while (true) {
-        int evictionsSinceLast = newEvictions.getAndSet(0);
-        if (evictionsSinceLast > 0) break;
-        synchronized (newEvictions) {
-          newEvictions.wait(10000);
-        }
-      }
-      // Duration is an estimate; if the size of the map changes, it can be very different.
-      long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
-      int leftToCheck = 0; // approximate
-      for (FileCache fc : cache.values()) {
-        leftToCheck += fc.cache.size();
-      }
-      // Iterate thru all the filecaches. This is best-effort.
-      // If these super-long-lived iterators affect the map in some bad way,
-      // we'd need to sleep once per round instead.
-      Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
-      boolean isPastEndTime = false;
-      while (iter.hasNext()) {
-        FileCache fc = iter.next().getValue();
-        if (!fc.incRef()) {
-          throw new AssertionError("Something other than cleanup is removing elements from map");
-        }
-        // Iterate thru the file cache. This is best-effort.
-        Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.cache.entrySet().iterator();
-        boolean isEmpty = true;
-        while (subIter.hasNext()) {
-          long time = -1;
-          isPastEndTime = isPastEndTime || ((time = System.nanoTime()) >= endTime);
-          Thread.sleep(((leftToCheck <= 0) || isPastEndTime)
-              ? 1 : (endTime - time) / (1000000L * leftToCheck));
-          if (subIter.next().getValue().isInvalid()) {
-            subIter.remove();
-          } else {
-            isEmpty = false;
-          }
-          --leftToCheck;
-        }
-        if (!isEmpty) {
-          fc.decRef();
-          continue;
-        }
-        // FileCache might be empty; see if we can remove it. "tryWriteLock"
-        if (!fc.startEvicting()) continue;
-        if (fc.cache.isEmpty()) {
-          fc.commitEvicting();
-          iter.remove();
+    @Override
+    public int cleanUpOneFileCache(
+        FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc,
+        int leftToCheck, long endTime, Ref<Boolean> isPastEndTime)
+        throws InterruptedException {
+      // Iterate thru the file cache. This is best-effort.
+      Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
+      boolean isEmpty = true;
+      while (subIter.hasNext()) {
+        long time = -1;
+        isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
+        Thread.sleep(((leftToCheck <= 0) || isPastEndTime.value)
+            ? 1 : (endTime - time) / (1000000L * leftToCheck));
+        if (subIter.next().getValue().isInvalid()) {
+          subIter.remove();
         } else {
-          fc.abortEvicting();
+          isEmpty = false;
         }
+        --leftToCheck;
       }
+      return leftToCheck;
     }
   }
 
@@ -516,11 +407,12 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   @Override
   public String debugDumpForOom() {
     StringBuilder sb = new StringBuilder("File cache state ");
-    for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
+    for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
+      cache.entrySet()) {
       if (!e.getValue().incRef()) continue;
       try {
         sb.append("\n  file " + e.getKey());
-        for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+        for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
           if (e2.getValue().incRef() < 0) continue;
           try {
             sb.append("\n    [").append(e2.getKey()).append(", ")

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
new file mode 100644
index 0000000..53e3275
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+
+import com.google.common.base.Function;
+
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
+  private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+  private final Allocator allocator;
+  private final AtomicInteger newEvictions = new AtomicInteger(0);
+  private Thread cleanupThread = null;
+  private final ConcurrentHashMap<Object, FileCache<FileData>> cache = new ConcurrentHashMap<>();
+  private final LowLevelCachePolicy cachePolicy;
+  private final long cleanupInterval;
+  private final LlapDaemonCacheMetrics metrics;
+
+  private static final class StripeInfoComparator implements
+      Comparator<StripeData> {
+    @Override
+    public int compare(StripeData o1, StripeData o2) {
+      int starts = Long.compare(o1.knownTornStart, o2.knownTornStart);
+      if (starts != 0) return starts;
+      starts = Long.compare(o1.firstStart, o2.firstStart);
+      if (starts != 0) return starts;
+      assert (o1.lastStart == o2.lastStart) == (o1.lastEnd == o2.lastEnd);
+      return Long.compare(o1.lastStart, o2.lastStart);
+    }
+  }
+
+  public static class FileData {
+    /**
+     * RW lock ensures we have a consistent view of the file data, which is important given that
+     * we generate "stripe" boundaries arbitrarily. Reading buffer data itself doesn't require
+     * that this lock is held; however, everything else in stripes list does.
+     * TODO: make more granular? We only care that each one reader sees consistent boundaries.
+     *       So, we could shallow-copy the stripes list, then have individual locks inside each.
+     */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Object fileKey;
+    private final int colCount;
+    private ArrayList<StripeData> stripes;
+
+    public FileData(Object fileKey, int colCount) {
+      this.fileKey = fileKey;
+      this.colCount = colCount;
+    }
+
+    public void toString(StringBuilder sb) {
+      sb.append("File data for ").append(fileKey).append(" with ").append(colCount)
+        .append(" columns: ").append(stripes);
+    }
+
+    public int getColCount() {
+      return colCount;
+    }
+
+    public ArrayList<StripeData> getData() {
+      return stripes;
+    }
+
+    public void addStripe(StripeData sd) {
+      if (stripes == null) {
+        stripes = new ArrayList<>();
+      }
+      stripes.add(sd);
+    }
+
+    @Override
+    public String toString() {
+      return "[fileKey=" + fileKey + ", colCount=" + colCount + ", stripes=" + stripes + "]";
+    }
+  }
+
+  public static final class StripeData {
+    // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
+    // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+    // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
+    // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
+    // wrt first row when caching - we may produce incorrect result if we adjust the split
+    // boundary, and also if we don't adjust it, depending where it falls. At best, we'd end up
+    // with spurious disk reads if we cache on row boundaries but splits include torn rows.
+    // This structure implies that when reading a split, we skip the first torn row but fully
+    // read the last torn row (as LineRecordReader does). If we want to support a different scheme,
+    // we'd need to store more offsets and make logic account for that.
+    private long knownTornStart; // This can change based on new splits.
+    private final long firstStart, lastStart, lastEnd;
+    // TODO: we can actually consider storing ALL the delta encoded row offsets - not a lot of
+    //       overhead compared to the data itself, and with row offsets, we could use columnar
+    //       blocks for inconsistent splits. We are not optimizing for inconsistent splits for now.
+
+    private final long rowCount;
+    private final OrcProto.ColumnEncoding[] encodings;
+    private LlapDataBuffer[][][] data; // column index, stream type, buffers
+
+    public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
+        long rowCount, ColumnEncoding[] encodings) {
+      this.knownTornStart = knownTornStart;
+      this.firstStart = firstStart;
+      this.lastStart = lastStart;
+      this.lastEnd = lastEnd;
+      this.encodings = encodings;
+      this.rowCount = rowCount;
+      this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+    }
+ 
+    @Override
+    public String toString() {
+      return toCoordinateString() + " with encodings [" + Arrays.toString(encodings)
+          .replace('\n', ' ') + "] and data " + SerDeLowLevelCacheImpl.toString(data);
+    }
+
+    public long getKnownTornStart() {
+      return knownTornStart;
+    }
+
+    public long getFirstStart() {
+      return firstStart;
+    }
+
+    public long getLastStart() {
+      return lastStart;
+    }
+
+    public long getLastEnd() {
+      return lastEnd;
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    public OrcProto.ColumnEncoding[] getEncodings() {
+      return encodings;
+    }
+
+    public LlapDataBuffer[][][] getData() {
+      return data;
+    }
+
+    public String toCoordinateString() {
+      return "stripe kts " + knownTornStart + " from "
+          + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
+    }
+
+    public static StripeData duplicateForResults(StripeData s) {
+      return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
+          s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
+    }
+
+    public void setKnownTornStart(long value) {
+      knownTornStart = value;
+    }
+  }
+
+  public static String toString(LlapDataBuffer[][][] data) {
+    if (data == null) return "null";
+    StringBuilder sb = new StringBuilder("[");
+    for (int i = 0; i < data.length; ++i) {
+      LlapDataBuffer[][] colData = data[i];
+      if (colData == null) {
+        sb.append("null, ");
+        continue;
+      }
+      sb.append("[");
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) {
+          sb.append("null, ");
+          continue;
+        }
+        sb.append("[");
+        for (int k = 0; k < streamData.length; ++k) {
+          LlapDataBuffer s = streamData[k];
+          sb.append(LlapDataBuffer.toDataString(s));
+        }
+        sb.append("], ");
+      }
+      sb.append("], ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+  
+
+  public static String toString(LlapDataBuffer[][] data) {
+    if (data == null) return "null";
+    StringBuilder sb = new StringBuilder("[");
+    for (int j = 0; j < data.length; ++j) {
+      LlapDataBuffer[] streamData = data[j];
+      if (streamData == null) {
+        sb.append("null, ");
+        continue;
+      }
+      sb.append("[");
+      for (int k = 0; k < streamData.length; ++k) {
+        LlapDataBuffer s = streamData[k];
+        sb.append(LlapDataBuffer.toDataString(s));
+      }
+      sb.append("], ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public SerDeLowLevelCacheImpl(
+      LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, Allocator allocator) {
+    this.cachePolicy = cachePolicy;
+    this.allocator = allocator;
+    this.cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
+    this.metrics = metrics;
+    LlapIoImpl.LOG.info("SerDe low-level level cache; cleanup interval {} sec", cleanupInterval);
+  }
+
+  public void startThreads() {
+    if (cleanupInterval < 0) return;
+    cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
+    cleanupThread.start();
+  }
+
+  public FileData getFileData(Object fileKey, long start, long end, boolean[] includes,
+      DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData)
+          throws IOException {
+    FileCache<FileData> subCache = cache.get(fileKey);
+    if (subCache == null || !subCache.incRef()) {
+      LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find cache for " + fileKey + " in " + cache);
+      markAllAsMissed(start, end, qfCounters, gotAllData);
+      return null;
+    }
+
+    try {
+      FileData cached = subCache.getCache();
+      cached.rwLock.readLock().lock();
+      LlapIoImpl.CACHE_LOGGER.info(("TODO# cache for " + fileKey + " is " + subCache.getCache()).replace('\n', ' '));
+      try {
+        if (cached.stripes == null) {
+          LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find any stripes for " + fileKey);
+          markAllAsMissed(start, end, qfCounters, gotAllData);
+          return null;
+        }
+        if (includes.length > cached.colCount) {
+          throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
+              + cached.colCount + " columns");
+        }
+        FileData result = new FileData(cached.fileKey, cached.colCount);
+        if (gotAllData != null) {
+          gotAllData.value = true;
+        }
+        // We will adjust start and end so that we could record the metrics; save the originals.
+        long origStart = start, origEnd = end;
+        // startIx is inclusive, endIx is exclusive.
+        int startIx = Integer.MIN_VALUE, endIx = Integer.MIN_VALUE;
+        LlapIoImpl.CACHE_LOGGER.info("TODO# Looking for data between " + start + " and " + end);
+        for (int i = 0; i < cached.stripes.size() && endIx == Integer.MIN_VALUE; ++i) {
+          StripeData si = cached.stripes.get(i);
+          LlapIoImpl.CACHE_LOGGER.info("TODO# looking at " + si.toCoordinateString());
+
+          if (startIx == i) {
+            // The start of the split was in the middle of the previous slice.
+            start = si.knownTornStart;
+          } else if (startIx == Integer.MIN_VALUE) {
+            // Determine if we need to read this slice for the split.
+            if (si.lastEnd <= start) continue; // Slice before the start of the split.
+            // Start of the split falls somewhere within or before this slice.
+            // Note the ">=" - LineRecordReader will skip the first row even if we start
+            // directly at its start, because it cannot know if it's the start or not.
+            // Unless it's 0; note that we DO give 0 special treatment here, unlike the EOF below,
+            // because zero is zero. Need to mention it in Javadoc.
+            if (start == 0 && si.firstStart == 0) {
+              startIx = i;
+            } else if (start >= si.firstStart) {
+              // If the start of the split points into the middle of the cached slice, we cannot
+              // use the cached block - it's encoded and columnar, so we cannot map the file
+              // offset to some "offset" in "middle" of the slice (but see TODO for firstStart).
+              startIx = i + 1;
+              // continue;
+            } else {
+              // Start of the split is before this slice.
+              startIx = i; // Simple case - we will read cache from the split start offset.
+              start = si.knownTornStart;
+            }
+          }
+
+          // Determine if this (or previous) is the last slice we need to read for this split.
+          if (startIx != Integer.MIN_VALUE && endIx == Integer.MIN_VALUE) {
+            if (si.lastEnd <= end) {
+              // The entire current slice is part of the split. Note that if split end EQUALS
+              // lastEnd, the split would also read the next row, so we do need to look at the
+              // next slice, if any (although we'd probably find we cannot use it).
+              // Note also that we DO NOT treat end-of-file differently here, cause we do not know
+              // of any such thing. The caller must handle lastEnd vs end of split vs end of file
+              // match correctly in terms of how LRR handles them. See above for start-of-file.
+              if (i + 1 != cached.stripes.size()) continue;
+              endIx = i + 1;
+              end = si.lastEnd;
+            } else if (si.lastStart <= end) {
+              // The split ends within (and would read) the last row of this slice. Exact match.
+              endIx = i + 1;
+              end = si.lastEnd;
+            } else {
+              // Either the slice comes entirely after the end of split (following a gap in cached
+              // data); or the split ends in the middle of the slice, so it's the same as in the
+              // startIx logic w.r.t. the partial match; so, we either don't want to, or cannot,
+              // use this. There's no need to distinguish these two cases for now.
+              endIx = i;
+              end = (endIx > 0) ? cached.stripes.get(endIx - 1).lastEnd : start;
+            }
+          }
+        }
+        LlapIoImpl.CACHE_LOGGER.info("TODO# determined stripe indexes " + startIx + ", " + endIx);
+        if (endIx <= startIx) {
+          if (gotAllData != null) {
+            gotAllData.value = false;
+          }
+          return null;  // No data for the split, or it fits in the middle of one or two slices.
+        }
+        if (start > origStart || end < origEnd) {
+          if (gotAllData != null) {
+            gotAllData.value = false;
+          }
+          long totalMiss = Math.max(0, origEnd - end) + Math.max(0, start - origStart);
+          metrics.incrCacheRequestedBytes(totalMiss);
+          if (qfCounters != null) {
+            qfCounters.recordCacheMiss(totalMiss);
+          }
+        }
+
+        result.stripes = new ArrayList<>(endIx - startIx);
+        for (int stripeIx = startIx; stripeIx < endIx; ++stripeIx) {
+          getCacheDataForOneSlice(stripeIx, cached, result, gotAllData, includes, qfCounters);
+        }
+        return result;
+      } finally {
+        cached.rwLock.readLock().unlock();
+      }
+    } finally {
+      subCache.decRef();
+    }
+  }
+
+
+  private void getCacheDataForOneSlice(int stripeIx, FileData cached, FileData result,
+      BooleanRef gotAllData, boolean[] includes, LowLevelCacheCounters qfCounters) {
+    StripeData cStripe = cached.stripes.get(stripeIx);
+    LlapIoImpl.CACHE_LOGGER.info("TODO# got stripe in cache " + cStripe);
+    StripeData stripe = StripeData.duplicateForResults(cStripe);
+    result.stripes.add(stripe);
+    boolean isMissed = false;
+    for (int colIx = 0; colIx < cached.colCount; ++colIx) {
+      if (!includes[colIx]) continue;
+      if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
+        assert cStripe.data[colIx] == null; // No encoding => must have no data.
+        isMissed = true;
+        if (gotAllData != null) {
+          gotAllData.value = false;
+        }
+        continue;
+      }
+      stripe.encodings[colIx] = cStripe.encodings[colIx];
+      LlapDataBuffer[][] cColData = cStripe.data[colIx];
+      assert cColData != null;
+      for (int streamIx = 0;
+          cColData != null && streamIx < cColData.length; ++streamIx) {
+        LlapDataBuffer[] streamData = cColData[streamIx];
+        // Note: this relies on the fact that we always evict the entire column, so if
+        //       we have the column data, we assume we have all the streams we need.
+        if (streamData == null) continue;
+        for (int i = 0; i < streamData.length; ++i) { // Finally, we are going to use "i"!
+          if (!lockBuffer(streamData[i], true)) {
+            LlapIoImpl.CACHE_LOGGER.info("TODO# couldn't lock data for stripe at "
+                + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
+
+            cColData = null;
+            isMissed = true;
+            handleRemovedColumnData(cColData);
+            if (gotAllData != null) {
+              gotAllData.value = false;
+            }
+            break;
+          }
+        }
+      }
+      // At this point, we have arrived at the level where we need all the data, and the
+      // arrays never change. So we will just do a shallow assignment here instead of copy.
+      stripe.data[colIx] = cColData;
+    }
+    doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
+  }
+
+
+  private void doMetricsStuffForOneSlice(
+      LowLevelCacheCounters qfCounters, StripeData stripe, boolean isMissed) {
+    // Slice boundaries may not match split boundaries due to torn rows in either direction,
+    // so this counter may not be consistent with splits. This is also why we increment
+    // requested bytes here, instead of based on the split - we don't want the metrics to be
+    // inconsistent with each other. No matter what we determine here, at least we'll account
+    // for both in the same manner.
+    long bytes = stripe.lastEnd - stripe.knownTornStart;
+    metrics.incrCacheRequestedBytes(bytes);
+    if (!isMissed) {
+      metrics.incrCacheHitBytes(bytes);
+    }
+    if (qfCounters != null) {
+      if (isMissed) {
+        qfCounters.recordCacheMiss(bytes);
+      } else {
+        qfCounters.recordCacheHit(bytes);
+      }
+    }
+  }
+
+  private void markAllAsMissed(long from, long to,
+      LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+    if (qfCounters != null) {
+      metrics.incrCacheRequestedBytes(to - from);
+      qfCounters.recordCacheMiss(to - from);
+    }
+    if (gotAllData != null) {
+      gotAllData.value = false;
+    }
+  }
+
+  private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+    int rc = buffer.incRef();
+    if (rc > 0) {
+      metrics.incrCacheNumLockedBuffers();
+    }
+    if (doNotifyPolicy && rc == 1) {
+      // We have just locked a buffer that wasn't previously locked.
+      cachePolicy.notifyLock(buffer);
+    }
+    return rc > 0;
+  }
+
+  public void putFileData(final FileData data, Priority priority,
+      LowLevelCacheCounters qfCounters) {
+    // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
+    //       overhead from the java objects to memory manager and remove it when discarding file.
+    if (data.stripes == null || data.stripes.isEmpty()) {
+      LlapIoImpl.LOG.warn("Trying to cache FileData with no data for " + data.fileKey);
+      return;
+    }
+    FileCache<FileData> subCache = null;
+    FileData cached = null;
+    data.rwLock.writeLock().lock();
+    try {
+      subCache = FileCache.getOrAddFileSubCache(
+          cache, data.fileKey, new Function<Void, FileData>() {
+        @Override
+        public FileData apply(Void input) {
+          return data; // If we don't have a file cache, we will add this one as is.
+        }
+      });
+      cached = subCache.getCache();
+    } finally {
+      if (data != cached) {
+        data.rwLock.writeLock().unlock();
+      }
+    }
+    try {
+      if (data != cached) {
+        cached.rwLock.writeLock().lock();
+      }
+      try {
+        for (StripeData si : data.stripes) {
+          lockAllBuffersForPut(si, priority);
+        }
+        if (data == cached) {
+          LlapIoImpl.CACHE_LOGGER.info(("TODO# cached new data " + data).replace('\n', ' '));
+          return;
+        }
+        LlapIoImpl.CACHE_LOGGER.info(("TODO# merging old " + cached + " and new " + data).replace('\n', ' '));
+        ArrayList<StripeData> combined = new ArrayList<>(
+            cached.stripes.size() + data.stripes.size());
+        combined.addAll(cached.stripes);
+        combined.addAll(data.stripes);
+        Collections.sort(combined, new StripeInfoComparator());
+        int lastIx = combined.size() - 1;
+        for (int ix = 0; ix < lastIx; ++ix) {
+          StripeData cur = combined.get(ix), next = combined.get(ix + 1);
+          if (cur.lastEnd <= next.firstStart) continue; // All good.
+          if (cur.firstStart == next.firstStart && cur.lastEnd == next.lastEnd) {
+            mergeStripeInfos(cur, next);
+            combined.remove(ix + 1);
+            --lastIx;
+            // Don't recheck with next, only 2 lists each w/o collisions.
+            continue;
+          }
+          // The original lists do not contain collisions, so only one is 'old'.
+          boolean isCurOriginal = cached.stripes.contains(cur);
+          handleRemovedStripeInfo(combined.remove(isCurOriginal ? ix : ix + 1));
+          --ix;
+          --lastIx;
+        }
+        cached.stripes = combined;
+        LlapIoImpl.CACHE_LOGGER.info(("TODO# new cache data is " + combined).replace('\n', ' '));
+
+      } finally {
+        cached.rwLock.writeLock().unlock();
+      }
+    } finally {
+      subCache.decRef();
+    }
+  }
+
+  private void lockAllBuffersForPut(StripeData si, Priority priority) {
+    for (int i = 0; i < si.data.length; ++i) {
+      LlapDataBuffer[][] colData = si.data[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
+          assert canLock;
+          /*LlapIoImpl.LOG.info("TODO# Calling cache on "
+              + System.identityHashCode(streamData[k]) + ": " + i + ", " + j + ", " + k);*/
+          cachePolicy.cache(streamData[k], priority);
+          streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+        }
+      }
+    }
+  }
+
+  private void handleRemovedStripeInfo(StripeData removed) {
+    for (LlapDataBuffer[][] colData : removed.data) {
+      handleRemovedColumnData(colData);
+    }
+  }
+
+  private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+    // TODO: could we tell the policy that we don't care about these and have them evicted? or we
+    //       could just deallocate them when unlocked, and free memory + handle that in eviction.
+    //       For now, just abandon the blocks - eventually, they'll get evicted.
+  }
+
+  private void mergeStripeInfos(StripeData to, StripeData from) {
+    LlapIoImpl.CACHE_LOGGER.info("TODO# merging slices data: old " + to + " and new " + from);
+    to.knownTornStart = Math.min(to.knownTornStart, from.knownTornStart);
+    if (from.encodings.length != to.encodings.length) {
+      throw new RuntimeException("Different encodings " + from + "; " + to);
+    }
+    for (int colIx = 0; colIx < from.encodings.length; ++colIx) {
+      if (to.encodings[colIx] == null) {
+        to.encodings[colIx] = from.encodings[colIx];
+      } else if (from.encodings[colIx] != null
+          && !to.encodings[colIx].equals(from.encodings[colIx])) {
+        throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
+      }
+      LlapDataBuffer[][] fromColData = from.data[colIx];
+      if (fromColData != null) {
+        if (to.data[colIx] != null) {
+          // Note: we assume here that the data that was returned to the caller from cache will not
+          // be passed back in via put. Right now it's safe since we don't do anything. But if we
+          // evict proactively, we will have to compare objects all the way down.
+          handleRemovedColumnData(to.data[colIx]);
+        }
+        to.data[colIx] = fromColData;
+      }
+    } 
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBuffer buffer) {
+    unlockBuffer((LlapDataBuffer)buffer, true);
+  }
+
+  @Override
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+    for (MemoryBuffer b : cacheBuffers) {
+      unlockBuffer((LlapDataBuffer)b, true);
+    }
+  }
+
+  private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+    boolean isLastDecref = (buffer.decRef() == 0);
+    if (handleLastDecRef && isLastDecref) {
+      // This is kind of not pretty, but this is how we detect whether buffer was cached.
+      // We would always set this for lookups at put time.
+      if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+        cachePolicy.notifyUnlock(buffer);
+      } else {
+        if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+          LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
+        }
+        allocator.deallocate(buffer);
+      }
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+  private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
+  public static LlapDataBuffer allocateFake() {
+    LlapDataBuffer fake = new LlapDataBuffer();
+    fake.initialize(-1, fakeBuf, 0, 1);
+    return fake;
+  }
+
+  public final void notifyEvicted(MemoryBuffer buffer) {
+    newEvictions.incrementAndGet();
+  }
+
+  private final class CleanupThread extends FileCacheCleanupThread<FileData> {
+
+    public CleanupThread(ConcurrentHashMap<Object, FileCache<FileData>> fileMap,
+        AtomicInteger newEvictions, long cleanupInterval) {
+      super("Llap serde low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
+    }
+
+    @Override
+    protected int getCacheSize(FileCache<FileData> fc) {
+      return 1; // Each iteration cleans the file cache as a single unit (unlike the ORC cache).
+    }
+
+    @Override
+    public int cleanUpOneFileCache(FileCache<FileData> fc, int leftToCheck, long endTime,
+        Ref<Boolean> isPastEndTime) throws InterruptedException {
+      FileData fd = fc.getCache();
+      fd.rwLock.writeLock().lock();
+      try {
+        for (StripeData sd : fd.stripes) {
+          for (int colIx = 0; colIx < sd.data.length; ++colIx) {
+            LlapDataBuffer[][] colData = sd.data[colIx];
+            if (colData == null) continue;
+            boolean hasAllData = true;
+            for (int j = 0; (j < colData.length) && hasAllData; ++j) {
+              LlapDataBuffer[] streamData = colData[j];
+              if (streamData == null) continue;
+              for (int k = 0; k < streamData.length; ++k) {
+                LlapDataBuffer buf = streamData[k];
+                hasAllData = hasAllData && lockBuffer(buf, false);
+                if (!hasAllData) break;
+                unlockBuffer(buf, true);
+              }
+            }
+            if (!hasAllData) {
+              handleRemovedColumnData(colData);
+              sd.data[colIx] = null;
+            }
+          }
+        }
+      } finally {
+        fd.rwLock.writeLock().unlock();
+      }
+      return leftToCheck - 1;
+    }
+  }
+
+  @Override
+  public boolean incRefBuffer(MemoryBuffer buffer) {
+    // notifyReused implies that buffer is already locked; it's also called once for new
+    // buffers that are not cached yet. Don't notify cache policy.
+    return lockBuffer(((LlapDataBuffer)buffer), false);
+  }
+
+  @Override
+  public Allocator getAllocator() {
+    return allocator;
+  }
+
+  @Override
+  public String debugDumpForOom() {
+    StringBuilder sb = new StringBuilder("File cache state ");
+    for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
+      if (!e.getValue().incRef()) continue;
+      try {
+        sb.append("\n  file " + e.getKey());
+        sb.append("\n    [");
+        e.getValue().getCache().toString(sb);
+        sb.append("]");
+      } finally {
+        e.getValue().decRef();
+      }
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 290624d..ac9c1da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,21 +27,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -53,15 +46,13 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.NullWritable;
@@ -89,65 +80,77 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
   @SuppressWarnings("rawtypes")
   private final InputFormat sourceInputFormat;
   private final AvoidSplitCombination sourceASC;
-  private final ColumnVectorProducer cvp;
-  private final ExecutorService executor;
+  @SuppressWarnings("deprecation")
+  private final Deserializer sourceSerDe;
+  final ColumnVectorProducer cvp;
+  final ExecutorService executor;
   private final String hostName;
 
   @SuppressWarnings("rawtypes")
-  LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
-      ExecutorService executor) {
-    // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
-    //       We'd need to plumb it thru and use it to get data to cache/etc.
-    assert sourceInputFormat instanceof OrcInputFormat;
+  LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
+      ColumnVectorProducer cvp, ExecutorService executor) {
     this.executor = executor;
     this.cvp = cvp;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
         ? (AvoidSplitCombination)sourceInputFormat : null;
+    this.sourceSerDe = sourceSerDe;
     this.hostName = HiveStringUtils.getHostname();
   }
 
   @Override
   public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    boolean useLlapIo = true;
-    if (split instanceof LlapAwareSplit) {
-      useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
-    }
+    RecordReader<NullWritable, VectorizedRowBatch> noLlap = checkLlapSplit(split, job, reporter);
+    if (noLlap != null) return noLlap;
+
     boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
 
-    if (!useLlapIo) {
-      LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
-      return sourceInputFormat.getRecordReader(split, job, reporter);
-    }
     FileSplit fileSplit = (FileSplit) split;
     reporter.setStatus(fileSplit.toString());
     try {
       List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
-      LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName);
-
+      RecordReader<?, ?> sourceRr = null;
+      LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName, cvp,
+          executor, sourceInputFormat, sourceSerDe, reporter);
       if (!rr.init()) {
         return sourceInputFormat.getRecordReader(split, job, reporter);
       }
 
-      // vectorized row batch reader
-      if (isVectorized) {
-        return rr;
-      }
-
-      // row batch to row-by-row reader
-      if (sourceInputFormat instanceof BatchToRowInputFormat) {
-        return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
-            rr, rr.getVectorizedRowBatchCtx(), includedCols));
-      }
-
-      return sourceInputFormat.getRecordReader(split, job, reporter);
+      return wrapLlapReader(isVectorized, includedCols, rr, split, job, reporter);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
   }
 
+  public RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
+      boolean isVectorized, List<Integer> includedCols, LlapRecordReader rr,
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    // vectorized row batch reader
+    if (isVectorized) {
+      return rr;
+    } else if (sourceInputFormat instanceof BatchToRowInputFormat) {
+      return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
+          rr, rr.getVectorizedRowBatchCtx(), includedCols));
+    } else {
+      LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+      return sourceInputFormat.getRecordReader(split, job, reporter);
+    }
+  }
+
+  public RecordReader<NullWritable, VectorizedRowBatch> checkLlapSplit(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    boolean useLlapIo = true;
+    if (split instanceof LlapAwareSplit) {
+      useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
+    }
+    if (useLlapIo) return null;
+
+    LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+    return sourceInputFormat.getRecordReader(split, job, reporter);
+  }
+
   // Returning either a vectorized or non-vectorized reader from the same call requires breaking
   // generics... this is how vectorization currently works.
   @SuppressWarnings("unchecked")
@@ -160,276 +163,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
     return sourceInputFormat.getSplits(job, numSplits);
   }
 
-  private class LlapRecordReader
-      implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
-    private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
-    private final FileSplit split;
-    private final List<Integer> columnIds;
-    private final SearchArgument sarg;
-    private final String[] columnNames;
-    private final VectorizedRowBatchCtx rbCtx;
-    private final Object[] partitionValues;
-
-    private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
-    private ColumnVectorBatch lastCvb = null;
-    private boolean isFirst = true;
-
-    private Throwable pendingError = null;
-    /** Vector that is currently being processed by our user. */
-    private boolean isDone = false;
-    private final boolean isClosed = false;
-    private final ConsumerFeedback<ColumnVectorBatch> feedback;
-    private final QueryFragmentCounters counters;
-    private long firstReturnTime;
-
-    private final JobConf jobConf;
-    private final TypeDescription fileSchema;
-    private final boolean[] includedColumns;
-    private final ReadPipeline rp;
-
-    public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
-        String hostName) throws IOException, HiveException {
-      this.jobConf = job;
-      this.split = split;
-      this.columnIds = includedCols;
-      this.sarg = ConvertAstToSearchArg.createFromConf(job);
-      this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      final String fragmentId = LlapTezUtils.getFragmentId(job);
-      final String dagId = LlapTezUtils.getDagId(job);
-      final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
-      MDC.put("dagId", dagId);
-      MDC.put("queryId", queryId);
-      TezCounters taskCounters = null;
-      if (fragmentId != null) {
-        MDC.put("fragmentId", fragmentId);
-        taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
-        LOG.info("Received fragment id: {}", fragmentId);
-      } else {
-        LOG.warn("Not using tez counters as fragment id string is null");
-      }
-      this.counters = new QueryFragmentCounters(job, taskCounters);
-      this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
-
-      MapWork mapWork = Utilities.getMapWork(job);
-      VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
-      rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
-
-      int partitionColumnCount = rbCtx.getPartitionColumnCount();
-      if (partitionColumnCount > 0) {
-        partitionValues = new Object[partitionColumnCount];
-        VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
-      } else {
-        partitionValues = null;
-      }
-      boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
-      TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(job, isAcidScan, Integer.MAX_VALUE);
-
-      // Create the consumer of encoded data; it will coordinate decoding to CVBs.
-      rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema);
-      feedback = rp;
-      fileSchema = rp.getFileSchema();
-      includedColumns = rp.getIncludedColumns();
-    }
-
-    /**
-     * Starts the data read pipeline
-     */
-    public boolean init() {
-      SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema,
-          rp.getReaderSchema(), includedColumns);
-      for (Integer colId : columnIds) {
-        if (!schemaEvolution.isPPDSafeConversion(colId)) {
-          LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
-          return false;
-        }
-      }
-
-      // perform the data read asynchronously
-      if (executor instanceof StatsRecordingThreadPool) {
-        // Every thread created by this thread pool will use the same handler
-        ((StatsRecordingThreadPool) executor)
-            .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
-      }
-      executor.submit(rp.getReadCallable());
-      return true;
-    }
-
-    @Override
-    public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-      assert value != null;
-      if (isClosed) {
-        throw new AssertionError("next called after close");
-      }
-      // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
-      boolean wasFirst = isFirst;
-      if (isFirst) {
-        if (partitionValues != null) {
-          rbCtx.addPartitionColsToBatch(value, partitionValues);
-        }
-        isFirst = false;
-      }
-      ColumnVectorBatch cvb = null;
-      try {
-        cvb = nextCvb();
-      } catch (InterruptedException e) {
-        // Query might have been canceled. Stop the background processing.
-        feedback.stop();
-        throw new IOException(e);
-      }
-      if (cvb == null) {
-        if (wasFirst) {
-          firstReturnTime = counters.startTimeCounter();
-        }
-        counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
-        return false;
-      }
-      if (columnIds.size() != cvb.cols.length) {
-        throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
-            + " included, but the reader returned " + cvb.cols.length);
-      }
-      // VRB was created from VrbCtx, so we already have pre-allocated column vectors
-      for (int i = 0; i < cvb.cols.length; ++i) {
-        // Return old CVs (if any) to caller. We assume these things all have the same schema.
-        cvb.swapColumnVector(i, value.cols, columnIds.get(i));
-      }
-      value.selectedInUse = false;
-      value.size = cvb.size;
-      if (wasFirst) {
-        firstReturnTime = counters.startTimeCounter();
-      }
-      return true;
-    }
-
-    public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
-      return rbCtx;
-    }
-
-    private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
-      @Override
-      public void uncaughtException(final Thread t, final Throwable e) {
-        LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
-            " Message: {}", t.getName(), t.getId(), e.getMessage());
-        setError(e);
-      }
-    }
-
-    ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
-      boolean isFirst = (lastCvb == null);
-      if (!isFirst) {
-        feedback.returnData(lastCvb);
-      }
-      synchronized (pendingData) {
-        // We are waiting for next block. Either we will get it, or be told we are done.
-        boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
-        if (doLogBlocking) {
-          LlapIoImpl.LOG.trace("next will block");
-        }
-        while (isNothingToReport()) {
-          pendingData.wait(100);
-        }
-        if (doLogBlocking) {
-          LlapIoImpl.LOG.trace("next is unblocked");
-        }
-        rethrowErrorIfAny();
-        lastCvb = pendingData.poll();
-      }
-      if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
-        LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
-      }
-      return lastCvb;
-    }
-
-    private boolean isNothingToReport() {
-      return !isDone && pendingData.isEmpty() && pendingError == null;
-    }
-
-    @Override
-    public NullWritable createKey() {
-      return NullWritable.get();
-    }
-
-    @Override
-    public VectorizedRowBatch createValue() {
-      return rbCtx.createVectorizedRowBatch();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
-      feedback.stop();
-      rethrowErrorIfAny();
-      MDC.clear();
-    }
-
-    private void rethrowErrorIfAny() throws IOException {
-      if (pendingError == null) return;
-      if (pendingError instanceof IOException) {
-        throw (IOException)pendingError;
-      }
-      throw new IOException(pendingError);
-    }
-
-    @Override
-    public void setDone() {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      synchronized (pendingData) {
-        isDone = true;
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public void consumeData(ColumnVectorBatch data) {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      synchronized (pendingData) {
-        if (isClosed) {
-          return;
-        }
-        pendingData.add(data);
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public void setError(Throwable t) {
-      counters.incrCounter(LlapIOCounters.NUM_ERRORS);
-      LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
-          isClosed, isDone, pendingError, pendingData.size());
-      assert t != null;
-      synchronized (pendingData) {
-        pendingError = t;
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      // TODO: plumb progress info thru the reader if we can get metadata from loader first.
-      return 0.0f;
-    }
-  }
-
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
     return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
   }
 
-  private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
+  static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
     // This is based on Vectorizer code, minus the validation.
 
     // Add all non-virtual columns from the TableScan operator.
@@ -477,5 +216,4 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
     }
     return tableScanOperator;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 8048624..7cfd133 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
 import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -74,7 +78,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
   private static final String MODE_CACHE = "cache";
 
-  private final ColumnVectorProducer cvp;
+  // TODO: later, we may have a map
+  private final ColumnVectorProducer orcCvp, genericCvp;
   private final ExecutorService executor;
   private final LlapDaemonCacheMetrics cacheMetrics;
   private final LlapDaemonIOMetrics ioMetrics;
@@ -110,6 +115,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
     OrcMetadataCache metadataCache = null;
     LowLevelCache cache = null;
+    SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
     BufferUsageManager bufferManager = null;
     if (useLowLevelCache) {
       // Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -124,11 +130,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       this.allocator = allocator;
       LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
           cacheMetrics, cachePolicy, allocator, true);
+      SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
+          cacheMetrics, cachePolicy, allocator);
       cache = cacheImpl;
+      serdeCache = serdeCacheImpl;
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
       metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
-      cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+      cachePolicy.setEvictionListener(new EvictionDispatcher(
+          cache, serdeCacheImpl, metadataCache, allocator));
       cachePolicy.setParentDebugDumper(cacheImpl);
       cacheImpl.startThreads(); // Start the cache threads.
       bufferManager = cacheImpl; // Cache also serves as buffer manager.
@@ -145,8 +155,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
         new LinkedBlockingQueue<Runnable>(),
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
     // TODO: this should depends on input format and be in a map, or something.
-    this.cvp = new OrcColumnVectorProducer(
+    this.orcCvp = new OrcColumnVectorProducer(
         metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
+    this.genericCvp = new GenericColumnVectorProducer(
+        serdeCache, bufferManager, conf, cacheMetrics, ioMetrics);
     LOG.info("LLAP IO initialized");
 
     registerMXBeans();
@@ -159,8 +171,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   @SuppressWarnings("rawtypes")
   @Override
   public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
-      InputFormat sourceInputFormat) {
-    return new LlapInputFormat(sourceInputFormat, cvp, executor);
+      InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+    ColumnVectorProducer cvp = genericCvp;
+    if (sourceInputFormat instanceof OrcInputFormat) {
+      cvp = orcCvp; // Special-case for ORC.
+    }
+    return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
   }
 
   @Override