You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/16 23:37:32 UTC

[1/2] orc git commit: ORC-119. Create an API to separate out layout from the writer. (omalley)

Repository: orc
Updated Branches:
  refs/heads/master 955190ffd -> ca7c97ebb


http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
----------------------------------------------------------------------
diff --git a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
index 4b0822f..b0315b4 100644
--- a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -39,7 +39,7 @@ File Statistics:
   Column 3: count: 21000 hasNull: false min: Darkness,-230 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 6910238
 
 Stripes:
-  Stripe: offset: 3 data: 163602 rows: 5000 tail: 68 index: 720
+  Stripe: offset: 3 data: 163585 rows: 5000 tail: 68 index: 720
     Stream: column 0 section ROW_INDEX start: 3 length 17
     Stream: column 1 section ROW_INDEX start: 20 length 166
     Stream: column 2 section ROW_INDEX start: 186 length 171
@@ -47,7 +47,7 @@ Stripes:
     Stream: column 1 section DATA start: 723 length 20035
     Stream: column 2 section DATA start: 20758 length 40050
     Stream: column 3 section DATA start: 60808 length 99226
-    Stream: column 3 section LENGTH start: 160034 length 4291
+    Stream: column 3 section LENGTH start: 160034 length 4274
     Encoding column 0: DIRECT
     Encoding column 1: DIRECT_V2
     Encoding column 2: DIRECT_V2
@@ -70,15 +70,15 @@ Stripes:
       Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660 sum: 75448 positions: 16464,3340,0,1554,14
       Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788 sum: 104868 positions: 36532,964,0,2372,90
       Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744 sum: 136158 positions: 63067,3432,0,3354,108
-  Stripe: offset: 164393 data: 368335 rows: 5000 tail: 69 index: 956
-    Stream: column 0 section ROW_INDEX start: 164393 length 17
-    Stream: column 1 section ROW_INDEX start: 164410 length 157
-    Stream: column 2 section ROW_INDEX start: 164567 length 166
-    Stream: column 3 section ROW_INDEX start: 164733 length 616
-    Stream: column 1 section DATA start: 165349 length 20035
-    Stream: column 2 section DATA start: 185384 length 40050
-    Stream: column 3 section DATA start: 225434 length 302715
-    Stream: column 3 section LENGTH start: 528149 length 5535
+  Stripe: offset: 164376 data: 368332 rows: 5000 tail: 69 index: 956
+    Stream: column 0 section ROW_INDEX start: 164376 length 17
+    Stream: column 1 section ROW_INDEX start: 164393 length 157
+    Stream: column 2 section ROW_INDEX start: 164550 length 166
+    Stream: column 3 section ROW_INDEX start: 164716 length 616
+    Stream: column 1 section DATA start: 165332 length 20035
+    Stream: column 2 section DATA start: 185367 length 40050
+    Stream: column 3 section DATA start: 225417 length 302715
+    Stream: column 3 section LENGTH start: 528132 length 5532
     Encoding column 0: DIRECT
     Encoding column 1: DIRECT_V2
     Encoding column 2: DIRECT_V2
@@ -101,15 +101,15 @@ Stripes:
       Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988 sum: 224740 positions: 94117,3404,0,1945,222
       Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984 sum: 252094 positions: 155111,2864,0,3268,48
       Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938 sum: 281404 positions: 224570,1006,0,4064,342
-  Stripe: offset: 533753 data: 606074 rows: 5000 tail: 69 index: 1427
-    Stream: column 0 section ROW_INDEX start: 533753 length 17
-    Stream: column 1 section ROW_INDEX start: 533770 length 167
-    Stream: column 2 section ROW_INDEX start: 533937 length 168
-    Stream: column 3 section ROW_INDEX start: 534105 length 1075
-    Stream: column 1 section DATA start: 535180 length 20035
-    Stream: column 2 section DATA start: 555215 length 40050
-    Stream: column 3 section DATA start: 595265 length 540210
-    Stream: column 3 section LENGTH start: 1135475 length 5779
+  Stripe: offset: 533733 data: 606071 rows: 5000 tail: 69 index: 1427
+    Stream: column 0 section ROW_INDEX start: 533733 length 17
+    Stream: column 1 section ROW_INDEX start: 533750 length 167
+    Stream: column 2 section ROW_INDEX start: 533917 length 168
+    Stream: column 3 section ROW_INDEX start: 534085 length 1075
+    Stream: column 1 section DATA start: 535160 length 20035
+    Stream: column 2 section DATA start: 555195 length 40050
+    Stream: column 3 section DATA start: 595245 length 540210
+    Stream: column 3 section LENGTH start: 1135455 length 5776
     Encoding column 0: DIRECT
     Encoding column 1: DIRECT_V2
     Encoding column 2: DIRECT_V2
@@ -132,15 +132,15 @@ Stripes:
       Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976 sum: 386538 posit
 ions: 185635,3966,0,2077,162
       Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802
 -12976-13216-13246-13502-13766 sum: 421660 positions: 295550,1384,0,3369,16
       Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298
 -12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974 sum: 453606 positions: 412768,1156,0,4041,470
-  Stripe: offset: 1141323 data: 864001 rows: 5000 tail: 69 index: 1975
-    Stream: column 0 section ROW_INDEX start: 1141323 length 17
-    Stream: column 1 section ROW_INDEX start: 1141340 length 156
-    Stream: column 2 section ROW_INDEX start: 1141496 length 168
-    Stream: column 3 section ROW_INDEX start: 1141664 length 1634
-    Stream: column 1 section DATA start: 1143298 length 20035
-    Stream: column 2 section DATA start: 1163333 length 40050
-    Stream: column 3 section DATA start: 1203383 length 798014
-    Stream: column 3 section LENGTH start: 2001397 length 5902
+  Stripe: offset: 1141300 data: 863962 rows: 5000 tail: 69 index: 1975
+    Stream: column 0 section ROW_INDEX start: 1141300 length 17
+    Stream: column 1 section ROW_INDEX start: 1141317 length 156
+    Stream: column 2 section ROW_INDEX start: 1141473 length 168
+    Stream: column 3 section ROW_INDEX start: 1141641 length 1634
+    Stream: column 1 section DATA start: 1143275 length 20035
+    Stream: column 2 section DATA start: 1163310 length 40050
+    Stream: column 3 section DATA start: 1203360 length 798014
+    Stream: column 3 section LENGTH start: 2001374 length 5863
     Encoding column 0: DIRECT
     Encoding column 1: DIRECT_V2
     Encoding column 2: DIRECT_V2
@@ -163,15 +163,15 @@ Stripes:
       Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9
 650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878 sum: 568274 positions: 286457,302,0,1926,462
       Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-91
 28-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788 sum: 594578 positions: 447943,3328,0,3444,250
       Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8
 390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904 sum: 631944 positions: 616471,3986,3778,547,292
-  Stripe: offset: 2007368 data: 207295 rows: 1000 tail: 67 index: 841
-    Stream: column 0 section ROW_INDEX start: 2007368 length 12
-    Stream: column 1 section ROW_INDEX start: 2007380 length 38
-    Stream: column 2 section ROW_INDEX start: 2007418 length 41
-    Stream: column 3 section ROW_INDEX start: 2007459 length 750
-    Stream: column 1 section DATA start: 2008209 length 4007
-    Stream: column 2 section DATA start: 2012216 length 8010
-    Stream: column 3 section DATA start: 2020226 length 194018
-    Stream: column 3 section LENGTH start: 2214244 length 1260
+  Stripe: offset: 2007306 data: 207282 rows: 1000 tail: 67 index: 841
+    Stream: column 0 section ROW_INDEX start: 2007306 length 12
+    Stream: column 1 section ROW_INDEX start: 2007318 length 38
+    Stream: column 2 section ROW_INDEX start: 2007356 length 41
+    Stream: column 3 section ROW_INDEX start: 2007397 length 750
+    Stream: column 1 section DATA start: 2008147 length 4007
+    Stream: column 2 section DATA start: 2012154 length 8010
+    Stream: column 3 section DATA start: 2020164 length 194018
+    Stream: column 3 section LENGTH start: 2214182 length 1247
     Encoding column 0: DIRECT
     Encoding column 1: DIRECT_V2
     Encoding column 2: DIRECT_V2
@@ -183,7 +183,7 @@ Stripes:
     Row group indices for column 3:
       Entry 0: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164-19348-19400-19546-19776-19896-20084 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-
 7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 670762 positions: 0,0,0,0,0
 
-File length: 2217685 bytes
+File length: 2217611 bytes
 Padding length: 0 bytes
 Padding ratio: 0%
 ________________________________________________________________________________________________________________________


[2/2] orc git commit: ORC-119. Create an API to separate out layout from the writer. (omalley)

Posted by om...@apache.org.
ORC-119. Create an API to separate out layout from the writer. (omalley)

Fixes #76

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ca7c97eb
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ca7c97eb
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ca7c97eb

Branch: refs/heads/master
Commit: ca7c97ebb7e5e97a9bc45894d377baa09f637e27
Parents: 955190f
Author: Owen O'Malley <om...@apache.org>
Authored: Fri Dec 9 13:30:56 2016 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jan 16 15:33:42 2017 -0800

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcFile.java  |  18 +
 .../src/java/org/apache/orc/PhysicalWriter.java | 133 ++++
 .../org/apache/orc/impl/BitFieldWriter.java     |   4 +
 .../java/org/apache/orc/impl/IntegerWriter.java |   6 +
 .../src/java/org/apache/orc/impl/OutStream.java |  52 +-
 .../org/apache/orc/impl/PhysicalFsWriter.java   | 386 ++++++++++
 .../apache/orc/impl/RunLengthByteWriter.java    |   4 +
 .../apache/orc/impl/RunLengthIntegerWriter.java |   4 +
 .../orc/impl/RunLengthIntegerWriterV2.java      |   5 +
 .../java/org/apache/orc/impl/WriterImpl.java    | 740 ++++++++-----------
 .../test/org/apache/orc/TestVectorOrcFile.java  |   4 +-
 .../test/org/apache/orc/impl/TestInStream.java  |   8 +-
 .../test/org/apache/orc/impl/TestOutStream.java |   5 +-
 .../orc-file-dump-dictionary-threshold.out      |  78 +-
 14 files changed, 923 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 7551c2e..68e49f3 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -298,6 +298,7 @@ public class OrcFile {
     private String bloomFilterColumns;
     private double bloomFilterFpp;
     private BloomFilterVersion bloomFilterVersion;
+    private PhysicalWriter physicalWriter;
 
     protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
@@ -487,6 +488,19 @@ public class OrcFile {
     }
 
     /**
+     * Change the physical writer of the ORC file.
+     *
+     * SHOULD ONLY BE USED BY LLAP.
+     *
+     * @param writer the writer to control the layout and persistence
+     * @return this
+     */
+    public WriterOptions physicalWriter(PhysicalWriter writer) {
+      this.physicalWriter = writer;
+      return this;
+    }
+
+    /**
      * A package local option to set the memory manager.
      */
     protected WriterOptions memory(MemoryManager value) {
@@ -569,6 +583,10 @@ public class OrcFile {
     public BloomFilterVersion getBloomFilterVersion() {
       return bloomFilterVersion;
     }
+
+    public PhysicalWriter getPhysicalWriter() {
+      return physicalWriter;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
new file mode 100644
index 0000000..9953d41
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.orc.impl.StreamName;
+
+/**
+ * This interface separates the physical layout of ORC files from the higher
+ * level details.
+ *
+ * This API is limited to being used by LLAP.
+ */
+public interface PhysicalWriter {
+
+  /**
+   * The target of an output stream.
+   */
+  interface OutputReceiver {
+    /**
+     * Output the given buffer to the final destination
+     *
+     * @param buffer the buffer to output
+     * @throws IOException
+     */
+    void output(ByteBuffer buffer) throws IOException;
+
+    /**
+     * Suppress this stream from being written to the stripe.
+     */
+    void suppress();
+  }
+  /**
+   * Writes the header of the file, which consists of the magic "ORC" bytes.
+   * @throws IOException
+   */
+  void writeHeader() throws IOException;
+
+  /**
+   * Create an OutputReceiver for the given name.
+   * @param name the name of the stream
+   * @throws IOException
+   */
+  OutputReceiver createDataStream(StreamName name) throws IOException;
+
+  /**
+   * Write an index in the given stream name.
+   * @param name the name of the stream
+   * @param index the bloom filter to write
+   * @param codec the compression codec to use
+   */
+  void writeIndex(StreamName name,
+                  OrcProto.RowIndex.Builder index,
+                  CompressionCodec codec) throws IOException;
+
+  /**
+   * Write a bloom filter index in the given stream name.
+   * @param name the name of the stream
+   * @param bloom the bloom filter to write
+   * @param codec the compression codec to use
+   */
+  void writeBloomFilter(StreamName name,
+                        OrcProto.BloomFilterIndex.Builder bloom,
+                        CompressionCodec codec) throws IOException;
+
+  /**
+   * Flushes the data in all the streams, spills them to disk, write out stripe
+   * footer.
+   * @param footer Stripe footer to be updated with relevant data and written out.
+   * @param dirEntry File metadata entry for the stripe, to be updated with
+   *                 relevant data.
+   */
+  void finalizeStripe(OrcProto.StripeFooter.Builder footer,
+                      OrcProto.StripeInformation.Builder dirEntry) throws IOException;
+
+  /**
+   * Writes out the file metadata.
+   * @param builder Metadata builder to finalize and write.
+   */
+  void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException;
+
+  /**
+   * Writes out the file footer.
+   * @param builder Footer builder to finalize and write.
+   */
+  void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException;
+
+  /**
+   * Writes out the postscript (including the size byte if needed).
+   * @param builder Postscript builder to finalize and write.
+   */
+  long writePostScript(OrcProto.PostScript.Builder builder) throws IOException;
+
+  /**
+   * Closes the writer.
+   */
+  void close() throws IOException;
+
+  /**
+   * Flushes the writer so that readers can see the preceeding postscripts.
+   */
+  void flush() throws IOException;
+
+  /**
+   * Appends raw stripe data (e.g. for file merger).
+   * @param stripe Stripe data buffer.
+   * @param dirEntry File metadata entry for the stripe, to be updated with
+   *                 relevant data.
+   * @throws IOException
+   */
+  void appendRawStripe(ByteBuffer stripe,
+                       OrcProto.StripeInformation.Builder dirEntry
+                       ) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
index aa5f886..3c8070f 100644
--- a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
@@ -70,4 +70,8 @@ public class BitFieldWriter {
     output.getPosition(recorder);
     recorder.addPosition(8 - bitsLeft);
   }
+
+  public long estimateMemory() {
+    return output.estimateMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
index 419054f..70b16d3 100644
--- a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -44,4 +44,10 @@ public interface IntegerWriter {
    * @throws IOException
    */
   void flush() throws IOException;
+
+  /**
+   * Estimate the amount of memory being used.
+   * @return number of bytes
+   */
+  long estimateMemory();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index 81662cc..a1131e4 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -18,26 +18,16 @@
 package org.apache.orc.impl;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class OutStream extends PositionedOutputStream {
 
-  public interface OutputReceiver {
-    /**
-     * Output the given buffer to the final destination
-     * @param buffer the buffer to output
-     * @throws IOException
-     */
-    void output(ByteBuffer buffer) throws IOException;
-  }
-
   public static final int HEADER_SIZE = 3;
   private final String name;
-  private final OutputReceiver receiver;
-  // if enabled the stream will be suppressed when writing stripe
-  private boolean suppress;
+  private final PhysicalWriter.OutputReceiver receiver;
 
   /**
    * Stores the uncompressed bytes that have been serialized, but not
@@ -69,17 +59,15 @@ public class OutStream extends PositionedOutputStream {
   public OutStream(String name,
                    int bufferSize,
                    CompressionCodec codec,
-                   OutputReceiver receiver) throws IOException {
+                   PhysicalWriter.OutputReceiver receiver) throws IOException {
     this.name = name;
     this.bufferSize = bufferSize;
     this.codec = codec;
     this.receiver = receiver;
-    this.suppress = false;
   }
 
   public void clear() throws IOException {
     flush();
-    suppress = false;
   }
 
   /**
@@ -258,32 +246,28 @@ public class OutStream extends PositionedOutputStream {
 
   @Override
   public long getBufferSize() {
-    long result = 0;
-    if (current != null) {
-      result += current.capacity();
-    }
-    if (compressed != null) {
-      result += compressed.capacity();
-    }
-    if (overflow != null) {
-      result += overflow.capacity();
+    if (codec == null) {
+      return uncompressedBytes + (current == null ? 0 : current.remaining());
+    } else {
+      long result = 0;
+      if (current != null) {
+        result += current.capacity();
+      }
+      if (compressed != null) {
+        result += compressed.capacity();
+      }
+      if (overflow != null) {
+        result += overflow.capacity();
+      }
+      return result + compressedBytes;
     }
-    return result;
   }
 
   /**
    * Set suppress flag
    */
   public void suppress() {
-    suppress = true;
-  }
-
-  /**
-   * Returns the state of suppress flag
-   * @return value of suppress flag
-   */
-  public boolean isSuppressed() {
-    return suppress;
+    receiver.suppress();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..48a0b42
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PhysicalFsWriter implements PhysicalWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+  private final FSDataOutputStream rawWriter;
+  // the compressed metadata information outStream
+  private OutStream writer = null;
+  // a protobuf outStream around streamFactory
+  private CodedOutputStream protobufWriter = null;
+
+  private final Path path;
+  private final long blockSize;
+  private final int bufferSize;
+  private final double paddingTolerance;
+  private final long defaultStripeSize;
+  private final CompressionKind compress;
+  private final boolean addBlockPadding;
+
+  // the streams that make up the current stripe
+  private final Map<StreamName, BufferedStream> streams =
+    new TreeMap<>();
+
+  private long adjustedStripeSize;
+  private long headerLength;
+  private long stripeStart;
+  private int metadataLength;
+  private int footerLength;
+
+  public PhysicalFsWriter(FileSystem fs,
+                          Path path,
+                          OrcFile.WriterOptions opts) throws IOException {
+    this.path = path;
+    this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+    this.addBlockPadding = opts.getBlockPadding();
+    if (opts.isEnforceBufferSize()) {
+      this.bufferSize = opts.getBufferSize();
+    } else {
+      this.bufferSize = WriterImpl.getEstimatedBufferSize(defaultStripeSize,
+          opts.getSchema().getMaximumId() + 1,
+          opts.getBufferSize());
+    }
+    this.compress = opts.getCompress();
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.blockSize = opts.getBlockSize();
+    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+        " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+        compress, bufferSize);
+    rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+        fs.getDefaultReplication(path), blockSize);
+    CompressionCodec codec = WriterImpl.createCodec(compress);
+    writer = new OutStream("metadata", bufferSize, codec,
+        new DirectStream(rawWriter));
+    protobufWriter = CodedOutputStream.newInstance(writer);
+  }
+
+  private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+    this.stripeStart = rawWriter.getPos();
+    final long currentStripeSize = indexSize + dataSize + footerSize;
+    final long available = blockSize - (stripeStart % blockSize);
+    final long overflow = currentStripeSize - adjustedStripeSize;
+    final float availRatio = (float) available / (float) defaultStripeSize;
+
+    if (availRatio > 0.0f && availRatio < 1.0f
+        && availRatio > paddingTolerance) {
+      // adjust default stripe size to fit into remaining space, also adjust
+      // the next stripe for correction based on the current stripe size
+      // and user specified padding tolerance. Since stripe size can overflow
+      // the default stripe size we should apply this correction to avoid
+      // writing portion of last stripe to next hdfs block.
+      double correction = overflow > 0 ? (double) overflow
+          / (double) adjustedStripeSize : 0.0;
+
+      // correction should not be greater than user specified padding
+      // tolerance
+      correction = correction > paddingTolerance ? paddingTolerance
+          : correction;
+
+      // adjust next stripe size based on current stripe estimate correction
+      adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+    } else if (availRatio >= 1.0) {
+      adjustedStripeSize = defaultStripeSize;
+    }
+
+    if (availRatio < paddingTolerance && addBlockPadding) {
+      long padding = blockSize - (stripeStart % blockSize);
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+      LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)",
+          padding, availRatio, defaultStripeSize));
+      stripeStart += padding;
+      while (padding > 0) {
+        int writeLen = (int) Math.min(padding, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        padding -= writeLen;
+      }
+      adjustedStripeSize = defaultStripeSize;
+    } else if (currentStripeSize < blockSize
+        && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+      // even if you don't pad, reset the default stripe size when crossing a
+      // block boundary
+      adjustedStripeSize = defaultStripeSize;
+    }
+  }
+
+  /**
+   * An output receiver that writes the ByteBuffers to the output stream
+   * as they are received.
+   */
+  private class DirectStream implements OutputReceiver {
+    private final FSDataOutputStream output;
+
+    DirectStream(FSDataOutputStream output) {
+      this.output = output;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+          buffer.remaining());
+    }
+
+    @Override
+    public void suppress() {
+      throw new UnsupportedOperationException("Can't suppress direct stream");
+    }
+  }
+
+  private void writeStripeFooter(OrcProto.StripeFooter footer,
+                                 long dataSize,
+                                 long indexSize,
+                                 OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    dirEntry.setOffset(stripeStart);
+    dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+  }
+
+  @Override
+  public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+    long startPosn = rawWriter.getPos();
+    OrcProto.Metadata metadata = builder.build();
+    metadata.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+    long bodyLength = rawWriter.getPos() - metadataLength;
+    builder.setContentLength(bodyLength);
+    builder.setHeaderLength(headerLength);
+    long startPosn = rawWriter.getPos();
+    OrcProto.Footer footer = builder.build();
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.footerLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+    builder.setFooterLength(footerLength);
+    builder.setMetadataLength(metadataLength);
+    if (compress != CompressionKind.NONE) {
+      builder.setCompressionBlockSize(bufferSize);
+    }
+    OrcProto.PostScript ps = builder.build();
+    // need to write this uncompressed
+    long startPosn = rawWriter.getPos();
+    ps.writeTo(rawWriter);
+    long length = rawWriter.getPos() - startPosn;
+    if (length > 255) {
+      throw new IllegalArgumentException("PostScript too large at " + length);
+    }
+    rawWriter.writeByte((int)length);
+    return rawWriter.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawWriter.close();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    rawWriter.hflush();
+  }
+
+  @Override
+  public void appendRawStripe(ByteBuffer buffer,
+      OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+    long start = rawWriter.getPos();
+    int length = buffer.remaining();
+    long availBlockSpace = blockSize - (start % blockSize);
+
+    // see if stripe can fit in the current hdfs block, else pad the remaining
+    // space in the block
+    if (length < blockSize && length > availBlockSpace &&
+        addBlockPadding) {
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+      LOG.info(String.format("Padding ORC by %d bytes while merging..",
+          availBlockSpace));
+      start += availBlockSpace;
+      while (availBlockSpace > 0) {
+        int writeLen = (int) Math.min(availBlockSpace, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        availBlockSpace -= writeLen;
+      }
+    }
+    rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+        length);
+    dirEntry.setOffset(start);
+  }
+
+
+  /**
+   * This class is used to hold the contents of streams as they are buffered.
+   * The TreeWriters write to the outStream and the codec compresses the
+   * data as buffers fill up and stores them in the output list. When the
+   * stripe is being written, the whole stream is written to the file.
+   */
+  private static final class BufferedStream implements OutputReceiver {
+    private boolean isSuppressed = false;
+    private final List<ByteBuffer> output = new ArrayList<>();
+
+    @Override
+    public void output(ByteBuffer buffer) {
+      if (!isSuppressed) {
+        output.add(buffer);
+      }
+    }
+
+    public void suppress() {
+      isSuppressed = true;
+      output.clear();
+    }
+
+    /**
+     * Write any saved buffers to the OutputStream if needed, and clears all the
+     * buffers.
+     */
+    void spillToDiskAndClear(FSDataOutputStream raw
+                                       ) throws IOException {
+      if (!isSuppressed) {
+        for (ByteBuffer buffer: output) {
+          raw.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+            buffer.remaining());
+        }
+        output.clear();
+      }
+      isSuppressed = false;
+    }
+
+    /**
+     * Get the number of bytes that will be written to the output.
+     *
+     * Assumes the stream writing into this receiver has already been flushed.
+     * @return number of bytes
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for (ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+  }
+
+  @Override
+  public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
+                             OrcProto.StripeInformation.Builder dirEntry
+                             ) throws IOException {
+    long indexSize = 0;
+    long dataSize = 0;
+    for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+      BufferedStream receiver = pair.getValue();
+      if (!receiver.isSuppressed) {
+        long streamSize = receiver.getOutputSize();
+        StreamName name = pair.getKey();
+        footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+            .setKind(name.getKind()).setLength(streamSize));
+        if (StreamName.Area.INDEX == name.getArea()) {
+          indexSize += streamSize;
+        } else {
+          dataSize += streamSize;
+        }
+      }
+    }
+    dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+    OrcProto.StripeFooter footer = footerBuilder.build();
+    // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+    padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+    // write out the data streams
+    for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+      pair.getValue().spillToDiskAndClear(rawWriter);
+    }
+    // Write out the footer.
+    writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+  }
+
+  @Override
+  public void writeHeader() throws IOException {
+    rawWriter.writeBytes(OrcFile.MAGIC);
+    headerLength = rawWriter.getPos();
+  }
+
+  @Override
+  public BufferedStream createDataStream(StreamName name) {
+    BufferedStream result = streams.get(name);
+    if (result == null) {
+      result = new BufferedStream();
+      streams.put(name, result);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeIndex(StreamName name,
+                         OrcProto.RowIndex.Builder index,
+                         CompressionCodec codec) throws IOException {
+    OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
+        createDataStream(name));
+    index.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @Override
+  public void writeBloomFilter(StreamName name,
+                               OrcProto.BloomFilterIndex.Builder bloom,
+                               CompressionCodec codec) throws IOException {
+    OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
+        createDataStream(name));
+    bloom.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @Override
+  public String toString() {
+    return path.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
index 09108b2..c2f1fa7 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
@@ -103,4 +103,8 @@ public class RunLengthByteWriter {
     output.getPosition(recorder);
     recorder.addPosition(numLiterals);
   }
+
+  public long estimateMemory() {
+    return output.getBufferSize() + MAX_LITERAL_SIZE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
index 3e5f2e2..88b47e6 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
@@ -140,4 +140,8 @@ public class RunLengthIntegerWriter implements IntegerWriter {
     recorder.addPosition(numLiterals);
   }
 
+  @Override
+  public long estimateMemory() {
+    return output.getBufferSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
index 183fd8e..29cbebf 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
@@ -818,4 +818,9 @@ public class RunLengthIntegerWriterV2 implements IntegerWriter {
     output.getPosition(recorder);
     recorder.addPosition(numLiterals);
   }
+
+  @Override
+  public long estimateMemory() {
+    return output.getBufferSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 22ff867..c364ca0 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -19,7 +19,6 @@
 package org.apache.orc.impl;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
@@ -39,23 +38,23 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.orc.BinaryColumnStatistics;
 import org.apache.orc.ColumnStatistics;
-import org.apache.orc.util.BloomFilter;
-import org.apache.orc.util.BloomFilterIO;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcUtils;
+import org.apache.orc.PhysicalWriter;
 import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
 import org.apache.orc.util.BloomFilterUtf8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -73,7 +72,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.io.Text;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -98,40 +96,28 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
 
-  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
   // threshold above which buffer size will be automatically resized
   private static final int COLUMN_COUNT_THRESHOLD = 1000;
 
-  private final FileSystem fs;
   private final Path path;
   private final long defaultStripeSize;
   private long adjustedStripeSize;
   private final int rowIndexStride;
   private final CompressionKind compress;
   private final CompressionCodec codec;
-  private final boolean addBlockPadding;
   private final int bufferSize;
   private final long blockSize;
-  private final double paddingTolerance;
   private final TypeDescription schema;
+  private final PhysicalWriter physicalWriter;
 
-  // the streams that make up the current stripe
-  private final Map<StreamName, BufferedStream> streams =
-    new TreeMap<StreamName, BufferedStream>();
-
-  private FSDataOutputStream rawWriter = null;
-  // the compressed metadata information outStream
-  private OutStream writer = null;
-  // a protobuf outStream around streamFactory
-  private CodedOutputStream protobufWriter = null;
-  private long headerLength;
   private int columnCount;
   private long rowCount = 0;
   private long rowsInStripe = 0;
   private long rawDataSize = 0;
   private int rowsInIndex = 0;
+  private long lastFlushOffset = 0;
   private int stripesAtLastFlush = -1;
   private final List<OrcProto.StripeInformation> stripes =
     new ArrayList<OrcProto.StripeInformation>();
@@ -155,7 +141,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   public WriterImpl(FileSystem fs,
                     Path path,
                     OrcFile.WriterOptions opts) throws IOException {
-    this.fs = fs;
     this.path = path;
     this.conf = opts.getConfiguration();
     this.callback = opts.getCallback();
@@ -177,9 +162,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.version = opts.getVersion();
     this.encodingStrategy = opts.getEncodingStrategy();
     this.compressionStrategy = opts.getCompressionStrategy();
-    this.addBlockPadding = opts.getBlockPadding();
     this.blockSize = opts.getBlockSize();
-    this.paddingTolerance = opts.getPaddingTolerance();
     this.compress = opts.getCompress();
     this.rowIndexStride = opts.getRowIndexStride();
     this.memoryManager = opts.getMemoryManager();
@@ -200,6 +183,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
     }
     this.bloomFilterFpp = opts.getBloomFilterFpp();
+    this.physicalWriter = opts.getPhysicalWriter() == null ?
+        new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
+    physicalWriter.writeHeader();
     treeWriter = createTreeWriter(schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -273,10 +259,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   @Override
   public boolean checkMemory(double newScale) throws IOException {
     long limit = (long) Math.round(adjustedStripeSize * newScale);
-    long size = estimateStripeSize();
+    long size = treeWriter.estimateMemory();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
-                limit);
+      LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+          " limit = " + limit);
     }
     if (size > limit) {
       flushStripe();
@@ -285,116 +271,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return false;
   }
 
-  /**
-   * This class is used to hold the contents of streams as they are buffered.
-   * The TreeWriters write to the outStream and the codec compresses the
-   * data as buffers fill up and stores them in the output list. When the
-   * stripe is being written, the whole stream is written to the file.
-   */
-  private class BufferedStream implements OutStream.OutputReceiver {
-    private final OutStream outStream;
-    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
-    BufferedStream(String name, int bufferSize,
-                   CompressionCodec codec) throws IOException {
-      outStream = new OutStream(name, bufferSize, codec, this);
-    }
-
-    /**
-     * Receive a buffer from the compression codec.
-     * @param buffer the buffer to save
-     */
-    @Override
-    public void output(ByteBuffer buffer) {
-      output.add(buffer);
-    }
-
-    /**
-     * Get the number of bytes in buffers that are allocated to this stream.
-     * @return number of bytes in buffers
-     */
-    public long getBufferSize() {
-      long result = 0;
-      for(ByteBuffer buf: output) {
-        result += buf.capacity();
-      }
-      return outStream.getBufferSize() + result;
-    }
-
-    /**
-     * Flush the stream to the codec.
-     * @throws IOException
-     */
-    public void flush() throws IOException {
-      outStream.flush();
-    }
-
-    /**
-     * Clear all of the buffers.
-     * @throws IOException
-     */
-    public void clear() throws IOException {
-      outStream.clear();
-      output.clear();
-    }
-
-    /**
-     * Check the state of suppress flag in output stream
-     * @return value of suppress flag
-     */
-    public boolean isSuppressed() {
-      return outStream.isSuppressed();
-    }
-
-    /**
-     * Get the number of bytes that will be written to the output. Assumes
-     * the stream has already been flushed.
-     * @return the number of bytes
-     */
-    public long getOutputSize() {
-      long result = 0;
-      for(ByteBuffer buffer: output) {
-        result += buffer.remaining();
-      }
-      return result;
-    }
-
-    /**
-     * Write the saved compressed buffers to the OutputStream.
-     * @param out the stream to write to
-     * @throws IOException
-     */
-    void spillTo(OutputStream out) throws IOException {
-      for(ByteBuffer buffer: output) {
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-          buffer.remaining());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return outStream.toString();
-    }
-  }
-
-  /**
-   * An output receiver that writes the ByteBuffers to the output stream
-   * as they are received.
-   */
-  private class DirectStream implements OutStream.OutputReceiver {
-    private final FSDataOutputStream output;
-
-    DirectStream(FSDataOutputStream output) {
-      this.output = output;
-    }
-
-    @Override
-    public void output(ByteBuffer buffer) throws IOException {
-      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-        buffer.remaining());
-    }
-  }
-
   private static class RowIndexPositionRecorder implements PositionRecorder {
     private final OrcProto.RowIndexEntry.Builder builder;
 
@@ -408,35 +284,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  /**
-   * Interface from the Writer to the TreeWriters. This limits the visibility
-   * that the TreeWriters have into the Writer.
-   */
-  private class StreamFactory {
-    /**
-     * Create a stream to store part of a column.
-     * @param column the column id for the stream
-     * @param kind the kind of stream
-     * @return The output outStream that the section needs to be written to.
-     * @throws IOException
-     */
-    public OutStream createStream(int column,
-                                  OrcProto.Stream.Kind kind
-                                  ) throws IOException {
-      final StreamName name = new StreamName(column, kind);
-      final EnumSet<CompressionCodec.Modifier> modifiers;
-
+  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+    CompressionCodec result = codec;
+    if (codec != null) {
       switch (kind) {
         case BLOOM_FILTER:
         case DATA:
         case DICTIONARY_DATA:
         case BLOOM_FILTER_UTF8:
-          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
-            modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
-                CompressionCodec.Modifier.TEXT);
+          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+            result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+                CompressionCodec.Modifier.TEXT));
           } else {
-            modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
-                CompressionCodec.Modifier.TEXT);
+            result = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+                CompressionCodec.Modifier.TEXT));
           }
           break;
         case LENGTH:
@@ -445,22 +306,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         case ROW_INDEX:
         case SECONDARY:
           // easily compressed using the fastest modes
-          modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
-              CompressionCodec.Modifier.BINARY);
+          result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+              CompressionCodec.Modifier.BINARY));
           break;
         default:
-          LOG.warn("Missing ORC compression modifiers for " + kind);
-          modifiers = null;
+          LOG.info("Missing ORC compression modifiers for " + kind);
           break;
       }
+    }
+    return result;
+  }
 
-      BufferedStream result = streams.get(name);
-      if (result == null) {
-        result = new BufferedStream(name.toString(), bufferSize,
-            codec == null ? codec : codec.modify(modifiers));
-        streams.put(name, result);
-      }
-      return result.outStream;
+  /**
+   * Interface from the Writer to the TreeWriters. This limits the visibility
+   * that the TreeWriters have into the Writer.
+   */
+  private class StreamFactory {
+    /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     * @throws IOException
+     */
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
+      final StreamName name = new StreamName(column, kind);
+      CompressionCodec codec = getCustomizedCodec(kind);
+
+      return new OutStream(physicalWriter.toString(), bufferSize, codec,
+          physicalWriter.createDataStream(name));
     }
 
     /**
@@ -552,6 +428,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     public OrcFile.BloomFilterVersion getBloomFilterVersion() {
       return bloomFilterVersion;
     }
+
+    public void writeIndex(StreamName name,
+                           OrcProto.RowIndex.Builder index) throws IOException {
+      physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+    }
+
+    public void writeBloomFilter(StreamName name,
+                                 OrcProto.BloomFilterIndex.Builder bloom
+                                 ) throws IOException {
+      physicalWriter.writeBloomFilter(name, bloom,
+          getCustomizedCodec(name.getKind()));
+    }
   }
 
   /**
@@ -571,9 +459,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     protected final RowIndexPositionRecorder rowIndexPosition;
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
-    private final PositionedOutputStream rowIndexStream;
-    private final PositionedOutputStream bloomFilterStream;
-    private final PositionedOutputStream bloomFilterStreamUtf8;
     protected final BloomFilter bloomFilter;
     protected final BloomFilterUtf8 bloomFilterUtf8;
     protected final boolean createBloomFilter;
@@ -613,39 +498,33 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       stripeColStatistics = ColumnStatisticsImpl.create(schema);
       fileStatistics = ColumnStatisticsImpl.create(schema);
       childrenWriters = new TreeWriter[0];
-      rowIndex = OrcProto.RowIndex.newBuilder();
-      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
-      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
-      stripeStatsBuilders = new ArrayList<>();
       if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+        rowIndex = OrcProto.RowIndex.newBuilder();
+        rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+        rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
       } else {
-        rowIndexStream = null;
+        rowIndex = null;
+        rowIndexEntry = null;
+        rowIndexPosition = null;
       }
+      stripeStatsBuilders = new ArrayList<>();
       if (createBloomFilter) {
         bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
         if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
           bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
               streamFactory.getBloomFilterFPP());
           bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-          bloomFilterStream = streamFactory.createStream(id,
-              OrcProto.Stream.Kind.BLOOM_FILTER);;
         } else {
           bloomFilter = null;
           bloomFilterIndex = null;
-          bloomFilterStream = null;
         }
         bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
             streamFactory.getBloomFilterFPP());
         bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
-        bloomFilterStreamUtf8 = streamFactory.createStream(id,
-              OrcProto.Stream.Kind.BLOOM_FILTER_UTF8);;
       } else {
         bloomFilterEntry = null;
         bloomFilterIndex = null;
         bloomFilterIndexUtf8 = null;
-        bloomFilterStreamUtf8 = null;
-        bloomFilterStream = null;
         bloomFilter = null;
         bloomFilterUtf8 = null;
       }
@@ -779,7 +658,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           isPresentOutStream.suppress();
           // since isPresent bitstream is suppressed, update the index to
           // remove the positions of the isPresent stream
-          if (rowIndexStream != null) {
+          if (rowIndex != null) {
             removeIsPresentPositions();
           }
         }
@@ -798,28 +677,27 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       if (streamFactory.hasWriterTimeZone()) {
         builder.setWriterTimezone(TimeZone.getDefault().getID());
       }
-      if (rowIndexStream != null) {
+      if (rowIndex != null) {
         if (rowIndex.getEntryCount() != requiredIndexEntries) {
           throw new IllegalArgumentException("Column has wrong number of " +
                "index entries found: " + rowIndex.getEntryCount() + " expected: " +
                requiredIndexEntries);
         }
-        rowIndex.build().writeTo(rowIndexStream);
-        rowIndexStream.flush();
+        streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+        rowIndex.clear();
+        rowIndexEntry.clear();
       }
-      rowIndex.clear();
-      rowIndexEntry.clear();
 
       // write the bloom filter to out stream
-      if (bloomFilterStream != null) {
-        bloomFilterIndex.build().writeTo(bloomFilterStream);
-        bloomFilterStream.flush();
+      if (bloomFilterIndex != null) {
+        streamFactory.writeBloomFilter(new StreamName(id,
+            OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
         bloomFilterIndex.clear();
       }
       // write the bloom filter to out stream
-      if (bloomFilterStreamUtf8 != null) {
-        bloomFilterIndexUtf8.build().writeTo(bloomFilterStreamUtf8);
-        bloomFilterStreamUtf8.flush();
+      if (bloomFilterIndexUtf8 != null) {
+        streamFactory.writeBloomFilter(new StreamName(id,
+            OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
         bloomFilterIndexUtf8.clear();
       }
     }
@@ -899,6 +777,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      */
     long estimateMemory() {
       long result = 0;
+      if (isPresent != null) {
+        result = isPresentOutStream.getBufferSize();
+      }
       for (TreeWriter child: childrenWriters) {
         result += child.estimateMemory();
       }
@@ -917,7 +798,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.writer = new BitFieldWriter(out, 1);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -949,7 +832,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       writer.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -957,6 +842,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       writer.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + writer.estimateMemory();
+    }
   }
 
   private static class ByteTreeWriter extends TreeWriter {
@@ -969,7 +859,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super(columnId, schema, writer, nullable);
       this.writer = new RunLengthByteWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA));
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1013,7 +905,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       writer.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1021,6 +915,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       writer.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + writer.estimateMemory();
+    }
   }
 
   private static class IntegerTreeWriter extends TreeWriter {
@@ -1036,7 +935,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1090,7 +991,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       writer.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1098,6 +1001,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       writer.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + writer.estimateMemory();
+    }
   }
 
   private static class FloatTreeWriter extends TreeWriter {
@@ -1112,7 +1020,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1157,7 +1067,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       stream.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1165,6 +1077,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       stream.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + stream.getBufferSize();
+    }
   }
 
   private static class DoubleTreeWriter extends TreeWriter {
@@ -1179,7 +1096,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1223,7 +1142,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       stream.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1231,18 +1152,22 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       stream.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + stream.getBufferSize();
+    }
   }
 
   private static abstract class StringBaseTreeWriter extends TreeWriter {
     private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final OutStream stringOutput;
-    private final IntegerWriter lengthOutput;
+    protected final IntegerWriter lengthOutput;
     private final IntegerWriter rowOutput;
     protected final StringRedBlackTree dictionary =
         new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
     protected final DynamicIntArray rows = new DynamicIntArray();
     protected final PositionedOutputStream directStreamOutput;
-    protected final IntegerWriter directLengthOutput;
     private final List<OrcProto.RowIndexEntry> savedRowIndex =
         new ArrayList<OrcProto.RowIndexEntry>();
     private final boolean buildIndex;
@@ -1261,18 +1186,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      boolean nullable) throws IOException {
       super(columnId, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
+      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
       stringOutput = writer.createStream(id,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
       lengthOutput = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      rowOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
+          writer);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
       rowIndexValueCount.add(0L);
       buildIndex = writer.buildIndex();
-      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      directLengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       Configuration conf = writer.getConfiguration();
       dictionaryKeySizeThreshold =
           OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
@@ -1315,16 +1240,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       // we need to build the rowindex before calling super, since it
       // writes it out.
       super.writeStripe(builder, requiredIndexEntries);
-      stringOutput.flush();
-      lengthOutput.flush();
-      rowOutput.flush();
-      directStreamOutput.flush();
-      directLengthOutput.flush();
+      if (useDictionaryEncoding) {
+        stringOutput.flush();
+        lengthOutput.flush();
+        rowOutput.flush();
+      } else {
+        directStreamOutput.flush();
+        lengthOutput.flush();
+      }
       // reset all of the fields to be ready for the next stripe.
       dictionary.clear();
       savedRowIndex.clear();
       rowIndexValueCount.clear();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
       rowIndexValueCount.add(0L);
 
       if (!useDictionaryEncoding) {
@@ -1374,7 +1304,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             } else {
               PositionRecorder posn = new RowIndexPositionRecorder(base);
               directStreamOutput.getPosition(posn);
-              directLengthOutput.getPosition(posn);
+              lengthOutput.getPosition(posn);
             }
             rowIndex.addEntry(base.build());
           }
@@ -1385,7 +1315,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           } else {
             dictionary.getText(text, rows.get(i));
             directStreamOutput.write(text.getBytes(), 0, text.getLength());
-            directLengthOutput.write(text.getLength());
+            lengthOutput.write(text.getLength());
           }
         }
       }
@@ -1450,13 +1380,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     private void recordDirectStreamPosition() throws IOException {
-      directStreamOutput.getPosition(rowIndexPosition);
-      directLengthOutput.getPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        directStreamOutput.getPosition(rowIndexPosition);
+        lengthOutput.getPosition(rowIndexPosition);
+      }
     }
 
     @Override
     long estimateMemory() {
-      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+      long parent = super.estimateMemory();
+      if (useDictionaryEncoding) {
+        return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
+      } else {
+        return parent + lengthOutput.estimateMemory() +
+            directStreamOutput.getBufferSize();
+      }
     }
   }
 
@@ -1484,7 +1422,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             for(int i=0; i < length; ++i) {
               directStreamOutput.write(vec.vector[0], vec.start[0],
                   vec.length[0]);
-              directLengthOutput.write(vec.length[0]);
+              lengthOutput.write(vec.length[0]);
             }
           }
           indexStatistics.updateString(vec.vector[0], vec.start[0],
@@ -1507,7 +1445,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             } else {
               directStreamOutput.write(vec.vector[offset + i],
                   vec.start[offset + i], vec.length[offset + i]);
-              directLengthOutput.write(vec.length[offset + i]);
+              lengthOutput.write(vec.length[offset + i]);
             }
             indexStatistics.updateString(vec.vector[offset + i],
                 vec.start[offset + i], vec.length[offset + i], 1);
@@ -1570,7 +1508,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           } else {
             for(int i=0; i < length; ++i) {
               directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
+              lengthOutput.write(itemLength);
             }
           }
           indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
@@ -1603,7 +1541,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
               rows.add(dictionary.add(ptr, ptrOffset, itemLength));
             } else {
               directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
+              lengthOutput.write(itemLength);
             }
             indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
             if (createBloomFilter) {
@@ -1653,7 +1591,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             for(int i=0; i < length; ++i) {
               directStreamOutput.write(vec.vector[0], vec.start[0],
                   itemLength);
-              directLengthOutput.write(itemLength);
+              lengthOutput.write(itemLength);
             }
           }
           indexStatistics.updateString(vec.vector[0], vec.start[0],
@@ -1679,7 +1617,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             } else {
               directStreamOutput.write(vec.vector[offset + i],
                   vec.start[offset + i], itemLength);
-              directLengthOutput.write(itemLength);
+              lengthOutput.write(itemLength);
             }
             indexStatistics.updateString(vec.vector[offset + i],
                 vec.start[offset + i], itemLength, 1);
@@ -1714,7 +1652,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       this.isDirectV2 = isNewWriteFormat(writer);
       this.length = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1776,7 +1716,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.writeStripe(builder, requiredIndexEntries);
       stream.flush();
       length.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1785,6 +1727,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       stream.getPosition(recorder);
       length.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + stream.getBufferSize() +
+          length.estimateMemory();
+    }
   }
 
   public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
@@ -1809,7 +1757,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
       this.nanos = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
       // for unit tests to set different time zones
       this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
       writer.useWriterTimeZone(true);
@@ -1875,7 +1825,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.writeStripe(builder, requiredIndexEntries);
       seconds.flush();
       nanos.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     private static long formatNanos(int nanos) {
@@ -1900,6 +1852,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       seconds.getPosition(recorder);
       nanos.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + seconds.estimateMemory() +
+          nanos.estimateMemory();
+    }
   }
 
   private static class DateTreeWriter extends TreeWriter {
@@ -1915,7 +1873,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1959,7 +1919,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                      int requiredIndexEntries) throws IOException {
       super.writeStripe(builder, requiredIndexEntries);
       writer.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -1977,6 +1939,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       return OrcProto.ColumnEncoding.newBuilder()
           .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + writer.estimateMemory();
+    }
   }
 
   private static class DecimalTreeWriter extends TreeWriter {
@@ -2000,7 +1967,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
       this.scaleStream = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2060,7 +2029,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.writeStripe(builder, requiredIndexEntries);
       valueStream.flush();
       scaleStream.flush();
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2069,6 +2040,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       valueStream.getPosition(recorder);
       scaleStream.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + valueStream.getBufferSize() +
+          scaleStream.estimateMemory();
+    }
   }
 
   private static class StructTreeWriter extends TreeWriter {
@@ -2084,7 +2061,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           children.get(i), writer,
           true);
       }
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2148,7 +2127,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       for(TreeWriter child: childrenWriters) {
         child.writeStripe(builder, requiredIndexEntries);
       }
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
   }
 
@@ -2167,7 +2148,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         createTreeWriter(schema.getChildren().get(0), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2243,7 +2226,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       for(TreeWriter child: childrenWriters) {
         child.writeStripe(builder, requiredIndexEntries);
       }
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2251,6 +2236,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       lengths.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + lengths.estimateMemory();
+    }
   }
 
   private static class MapTreeWriter extends TreeWriter {
@@ -2271,7 +2261,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         createTreeWriter(children.get(1), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2352,7 +2344,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       for(TreeWriter child: childrenWriters) {
         child.writeStripe(builder, requiredIndexEntries);
       }
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2360,6 +2354,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       lengths.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + lengths.estimateMemory();
+    }
   }
 
   private static class UnionTreeWriter extends TreeWriter {
@@ -2379,7 +2378,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       tags =
         new RunLengthByteWriter(writer.createStream(columnId,
             OrcProto.Stream.Kind.DATA));
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2451,7 +2452,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       for(TreeWriter child: childrenWriters) {
         child.writeStripe(builder, requiredIndexEntries);
       }
-      recordPosition(rowIndexPosition);
+      if (rowIndexPosition != null) {
+        recordPosition(rowIndexPosition);
+      }
     }
 
     @Override
@@ -2459,6 +2462,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       super.recordPosition(recorder);
       tags.getPosition(recorder);
     }
+
+    @Override
+    long estimateMemory() {
+      return super.estimateMemory() + tags.estimateMemory();
+    }
   }
 
   private static TreeWriter createTreeWriter(TypeDescription schema,
@@ -2609,27 +2617,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  // @VisibleForTesting
-  public FSDataOutputStream getStream() throws IOException {
-    if (rawWriter == null) {
-      rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
-                            fs.getDefaultReplication(path), blockSize);
-      rawWriter.writeBytes(OrcFile.MAGIC);
-      headerLength = rawWriter.getPos();
-      writer = new OutStream("metadata", bufferSize, codec,
-                             new DirectStream(rawWriter));
-      protobufWriter = CodedOutputStream.newInstance(writer);
-    }
-    return rawWriter;
-  }
-
   private void createRowIndexEntry() throws IOException {
     treeWriter.createRowIndexEntry();
     rowsInIndex = 0;
   }
 
   private void flushStripe() throws IOException {
-    getStream();
     if (buildIndex && rowsInIndex != 0) {
       createRowIndexEntry();
     }
@@ -2643,95 +2636,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       OrcProto.StripeFooter.Builder builder =
           OrcProto.StripeFooter.newBuilder();
       treeWriter.writeStripe(builder, requiredIndexEntries);
-      long indexSize = 0;
-      long dataSize = 0;
-      for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
-        BufferedStream stream = pair.getValue();
-        if (!stream.isSuppressed()) {
-          stream.flush();
-          StreamName name = pair.getKey();
-          long streamSize = pair.getValue().getOutputSize();
-          builder.addStreams(OrcProto.Stream.newBuilder()
-                             .setColumn(name.getColumn())
-                             .setKind(name.getKind())
-                             .setLength(streamSize));
-          if (StreamName.Area.INDEX == name.getArea()) {
-            indexSize += streamSize;
-          } else {
-            dataSize += streamSize;
-          }
-        }
-      }
-      OrcProto.StripeFooter footer = builder.build();
-
-      // Do we need to pad the file so the stripe doesn't straddle a block
-      // boundary?
-      long start = rawWriter.getPos();
-      final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
-      final long available = blockSize - (start % blockSize);
-      final long overflow = currentStripeSize - adjustedStripeSize;
-      final float availRatio = (float) available / (float) defaultStripeSize;
-
-      if (availRatio > 0.0f && availRatio < 1.0f
-          && availRatio > paddingTolerance) {
-        // adjust default stripe size to fit into remaining space, also adjust
-        // the next stripe for correction based on the current stripe size
-        // and user specified padding tolerance. Since stripe size can overflow
-        // the default stripe size we should apply this correction to avoid
-        // writing portion of last stripe to next hdfs block.
-        double correction = overflow > 0 ? (double) overflow
-            / (double) adjustedStripeSize : 0.0;
-
-        // correction should not be greater than user specified padding
-        // tolerance
-        correction = correction > paddingTolerance ? paddingTolerance
-            : correction;
-
-        // adjust next stripe size based on current stripe estimate correction
-        adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
-      } else if (availRatio >= 1.0) {
-        adjustedStripeSize = defaultStripeSize;
-      }
-
-      if (availRatio < paddingTolerance && addBlockPadding) {
-        long padding = blockSize - (start % blockSize);
-        byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
-        LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)", 
-            padding, availRatio, defaultStripeSize));
-        start += padding;
-        while (padding > 0) {
-          int writeLen = (int) Math.min(padding, pad.length);
-          rawWriter.write(pad, 0, writeLen);
-          padding -= writeLen;
-        }
-        adjustedStripeSize = defaultStripeSize;
-      } else if (currentStripeSize < blockSize
-          && (start % blockSize) + currentStripeSize > blockSize) {
-        // even if you don't pad, reset the default stripe size when crossing a
-        // block boundary
-        adjustedStripeSize = defaultStripeSize;
-      }
-
-      // write out the data streams
-      for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
-        BufferedStream stream = pair.getValue();
-        if (!stream.isSuppressed()) {
-          stream.spillTo(rawWriter);
-        }
-        stream.clear();
-      }
-      footer.writeTo(protobufWriter);
-      protobufWriter.flush();
-      writer.flush();
-      long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
-      OrcProto.StripeInformation dirEntry =
+      OrcProto.StripeInformation.Builder dirEntry =
           OrcProto.StripeInformation.newBuilder()
-              .setOffset(start)
-              .setNumberOfRows(rowsInStripe)
-              .setIndexLength(indexSize)
-              .setDataLength(dataSize)
-              .setFooterLength(footerLength).build();
-      stripes.add(dirEntry);
+              .setNumberOfRows(rowsInStripe);
+      physicalWriter.finalizeStripe(builder, dirEntry);
+      stripes.add(dirEntry.build());
       rowCount += rowsInStripe;
       rowsInStripe = 0;
     }
@@ -2812,30 +2721,33 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  private int writeMetadata() throws IOException {
-    getStream();
+  private void writeMetadata() throws IOException {
     OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
     for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
       builder.addStripeStats(ssb.build());
     }
+    physicalWriter.writeFileMetadata(builder);
+  }
 
-    long startPosn = rawWriter.getPos();
-    OrcProto.Metadata metadata = builder.build();
-    metadata.writeTo(protobufWriter);
-    protobufWriter.flush();
-    writer.flush();
-    return (int) (rawWriter.getPos() - startPosn);
+  private long writePostScript() throws IOException {
+    OrcProto.PostScript.Builder builder =
+        OrcProto.PostScript.newBuilder()
+            .setCompression(writeCompressionKind(compress))
+            .setMagic(OrcFile.MAGIC)
+            .addVersion(version.getMajor())
+            .addVersion(version.getMinor())
+            .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
+    if (compress != CompressionKind.NONE) {
+      builder.setCompressionBlockSize(bufferSize);
+    }
+    return physicalWriter.writePostScript(builder);
   }
 
-  private int writeFooter(long bodyLength) throws IOException {
-    getStream();
+  private long writeFooter() throws IOException {
+    writeMetadata();
     OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
-    builder.setContentLength(bodyLength);
-    builder.setHeaderLength(headerLength);
     builder.setNumberOfRows(rowCount);
     builder.setRowIndexStride(rowIndexStride);
-    // populate raw data size
-    rawDataSize = computeRawDataSize();
     // serialize the types
     writeTypes(builder, schema);
     // add the stripe information
@@ -2849,45 +2761,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
         .setName(entry.getKey()).setValue(entry.getValue()));
     }
-    long startPosn = rawWriter.getPos();
-    OrcProto.Footer footer = builder.build();
-    footer.writeTo(protobufWriter);
-    protobufWriter.flush();
-    writer.flush();
-    return (int) (rawWriter.getPos() - startPosn);
-  }
-
-  private int writePostScript(int footerLength, int metadataLength) throws IOException {
-    OrcProto.PostScript.Builder builder =
-      OrcProto.PostScript.newBuilder()
-        .setCompression(writeCompressionKind(compress))
-        .setFooterLength(footerLength)
-        .setMetadataLength(metadataLength)
-        .setMagic(OrcFile.MAGIC)
-        .addVersion(version.getMajor())
-        .addVersion(version.getMinor())
-        .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
-    if (compress != CompressionKind.NONE) {
-      builder.setCompressionBlockSize(bufferSize);
-    }
-    OrcProto.PostScript ps = builder.build();
-    // need to write this uncompressed
-    long startPosn = rawWriter.getPos();
-    ps.writeTo(rawWriter);
-    long length = rawWriter.getPos() - startPosn;
-    if (length > 255) {
-      throw new IllegalArgumentException("PostScript too large at " + length);
-    }
-    return (int) length;
-  }
-
-  private long estimateStripeSize() {
-    long result = 0;
-    for(BufferedStream stream: streams.values()) {
-      result += stream.getBufferSize();
-    }
-    result += treeWriter.estimateMemory();
-    return result;
+    physicalWriter.writeFileFooter(builder);
+    return writePostScript();
   }
 
   @Override
@@ -2933,11 +2808,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     memoryManager.removeWriter(path);
     // actually close the file
     flushStripe();
-    int metadataLength = writeMetadata();
-    int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
-    rawWriter.writeByte(writePostScript(footerLength, metadataLength));
-    rawWriter.close();
-
+    lastFlushOffset = writeFooter();
+    physicalWriter.close();
   }
 
   /**
@@ -2967,13 +2839,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       if (callback != null) {
         callback.preFooterWrite(callbackContext);
       }
-      int metaLength = writeMetadata();
-      int footLength = writeFooter(rawWriter.getPos() - metaLength);
-      rawWriter.writeByte(writePostScript(footLength, metaLength));
+      lastFlushOffset = writeFooter();
       stripesAtLastFlush = stripes.size();
-      rawWriter.hflush();
+      physicalWriter.flush();
     }
-    return rawWriter.getPos();
+    return lastFlushOffset;
   }
 
   static void checkArgument(boolean expression, String message) {
@@ -2993,28 +2863,15 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     checkArgument(stripeStatistics != null,
         "Stripe statistics must not be null");
 
-    getStream();
-    long start = rawWriter.getPos();
-    long availBlockSpace = blockSize - (start % blockSize);
-
-    // see if stripe can fit in the current hdfs block, else pad the remaining
-    // space in the block
-    if (length < blockSize && length > availBlockSpace &&
-        addBlockPadding) {
-      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
-      LOG.info(String.format("Padding ORC by %d bytes while merging..",
-          availBlockSpace));
-      start += availBlockSpace;
-      while (availBlockSpace > 0) {
-        int writeLen = (int) Math.min(availBlockSpace, pad.length);
-        rawWriter.write(pad, 0, writeLen);
-        availBlockSpace -= writeLen;
-      }
-    }
-
-    rawWriter.write(stripe);
-    rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
-    rowCount += rowsInStripe;
+    // update stripe information
+    OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+        .newBuilder()
+        .setNumberOfRows(rowsInStripe)
+        .setIndexLength(stripeInfo.getIndexLength())
+        .setDataLength(stripeInfo.getDataLength())
+        .setFooterLength(stripeInfo.getFooterLength());
+    physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length),
+        dirEntry);
 
     // since we have already written the stripe, just update stripe statistics
     treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
@@ -3022,16 +2879,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     // update file level statistics
     updateFileStatistics(stripeStatistics);
 
-    // update stripe information
-    OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
-        .newBuilder()
-        .setOffset(start)
-        .setNumberOfRows(rowsInStripe)
-        .setIndexLength(stripeInfo.getIndexLength())
-        .setDataLength(stripeInfo.getDataLength())
-        .setFooterLength(stripeInfo.getFooterLength())
-        .build();
-    stripes.add(dirEntry);
+    stripes.add(dirEntry.build());
 
     // reset it after writing the stripe
     rowsInStripe = 0;

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 2448cb7..7df521d 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -2117,8 +2117,8 @@ public class TestVectorOrcFile {
     int i = 0;
     for(StripeInformation stripe: reader.getStripes()) {
       i += 1;
-      assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
-          stripe.getDataLength() < 5000);
+      assertTrue(testFilePath + " stripe " + i + " is too long at " +
+              stripe.getDataLength(), stripe.getDataLength() < 5000);
     }
     // with HIVE-7832, the dictionaries will be disabled after writing the first
     // stripe as there are too many distinct values. Hence only 3 stripes as

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/impl/TestInStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index eea2ab2..d40676c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -31,11 +31,12 @@ import java.util.List;
 
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
 import org.junit.Test;
 
 public class TestInStream {
 
-  static class OutputCollector implements OutStream.OutputReceiver {
+  static class OutputCollector implements PhysicalWriter.OutputReceiver {
     DynamicByteArray buffer = new DynamicByteArray();
 
     @Override
@@ -43,6 +44,11 @@ public class TestInStream {
       this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
           buffer.remaining());
     }
+
+    @Override
+    public void suppress() {
+      // PASS
+    }
   }
 
   static class PositionCollector

http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/impl/TestOutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index e9614d5..77aae06 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -19,6 +19,7 @@
 package org.apache.orc.impl;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -30,8 +31,8 @@ public class TestOutStream {
 
   @Test
   public void testFlush() throws Exception {
-    OutStream.OutputReceiver receiver =
-        Mockito.mock(OutStream.OutputReceiver.class);
+    PhysicalWriter.OutputReceiver receiver =
+        Mockito.mock(PhysicalWriter.OutputReceiver.class);
     CompressionCodec codec = new ZlibCodec();
     OutStream stream = new OutStream("test", 128*1024, codec, receiver);
     assertEquals(0L, stream.getBufferSize());