You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/16 23:37:32 UTC
[1/2] orc git commit: ORC-119. Create an API to separate out layout
from the writer. (omalley)
Repository: orc
Updated Branches:
refs/heads/master 955190ffd -> ca7c97ebb
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
----------------------------------------------------------------------
diff --git a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
index 4b0822f..b0315b4 100644
--- a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -39,7 +39,7 @@ File Statistics:
Column 3: count: 21000 hasNull: false min: Darkness,-230 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 6910238
Stripes:
- Stripe: offset: 3 data: 163602 rows: 5000 tail: 68 index: 720
+ Stripe: offset: 3 data: 163585 rows: 5000 tail: 68 index: 720
Stream: column 0 section ROW_INDEX start: 3 length 17
Stream: column 1 section ROW_INDEX start: 20 length 166
Stream: column 2 section ROW_INDEX start: 186 length 171
@@ -47,7 +47,7 @@ Stripes:
Stream: column 1 section DATA start: 723 length 20035
Stream: column 2 section DATA start: 20758 length 40050
Stream: column 3 section DATA start: 60808 length 99226
- Stream: column 3 section LENGTH start: 160034 length 4291
+ Stream: column 3 section LENGTH start: 160034 length 4274
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
@@ -70,15 +70,15 @@ Stripes:
Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660 sum: 75448 positions: 16464,3340,0,1554,14
Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788 sum: 104868 positions: 36532,964,0,2372,90
Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744 sum: 136158 positions: 63067,3432,0,3354,108
- Stripe: offset: 164393 data: 368335 rows: 5000 tail: 69 index: 956
- Stream: column 0 section ROW_INDEX start: 164393 length 17
- Stream: column 1 section ROW_INDEX start: 164410 length 157
- Stream: column 2 section ROW_INDEX start: 164567 length 166
- Stream: column 3 section ROW_INDEX start: 164733 length 616
- Stream: column 1 section DATA start: 165349 length 20035
- Stream: column 2 section DATA start: 185384 length 40050
- Stream: column 3 section DATA start: 225434 length 302715
- Stream: column 3 section LENGTH start: 528149 length 5535
+ Stripe: offset: 164376 data: 368332 rows: 5000 tail: 69 index: 956
+ Stream: column 0 section ROW_INDEX start: 164376 length 17
+ Stream: column 1 section ROW_INDEX start: 164393 length 157
+ Stream: column 2 section ROW_INDEX start: 164550 length 166
+ Stream: column 3 section ROW_INDEX start: 164716 length 616
+ Stream: column 1 section DATA start: 165332 length 20035
+ Stream: column 2 section DATA start: 185367 length 40050
+ Stream: column 3 section DATA start: 225417 length 302715
+ Stream: column 3 section LENGTH start: 528132 length 5532
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
@@ -101,15 +101,15 @@ Stripes:
Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988 sum: 224740 positions: 94117,3404,0,1945,222
Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984 sum: 252094 positions: 155111,2864,0,3268,48
Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938 sum: 281404 positions: 224570,1006,0,4064,342
- Stripe: offset: 533753 data: 606074 rows: 5000 tail: 69 index: 1427
- Stream: column 0 section ROW_INDEX start: 533753 length 17
- Stream: column 1 section ROW_INDEX start: 533770 length 167
- Stream: column 2 section ROW_INDEX start: 533937 length 168
- Stream: column 3 section ROW_INDEX start: 534105 length 1075
- Stream: column 1 section DATA start: 535180 length 20035
- Stream: column 2 section DATA start: 555215 length 40050
- Stream: column 3 section DATA start: 595265 length 540210
- Stream: column 3 section LENGTH start: 1135475 length 5779
+ Stripe: offset: 533733 data: 606071 rows: 5000 tail: 69 index: 1427
+ Stream: column 0 section ROW_INDEX start: 533733 length 17
+ Stream: column 1 section ROW_INDEX start: 533750 length 167
+ Stream: column 2 section ROW_INDEX start: 533917 length 168
+ Stream: column 3 section ROW_INDEX start: 534085 length 1075
+ Stream: column 1 section DATA start: 535160 length 20035
+ Stream: column 2 section DATA start: 555195 length 40050
+ Stream: column 3 section DATA start: 595245 length 540210
+ Stream: column 3 section LENGTH start: 1135455 length 5776
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
@@ -132,15 +132,15 @@ Stripes:
Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976 sum: 386538 posit
ions: 185635,3966,0,2077,162
Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802
-12976-13216-13246-13502-13766 sum: 421660 positions: 295550,1384,0,3369,16
Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298
-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974 sum: 453606 positions: 412768,1156,0,4041,470
- Stripe: offset: 1141323 data: 864001 rows: 5000 tail: 69 index: 1975
- Stream: column 0 section ROW_INDEX start: 1141323 length 17
- Stream: column 1 section ROW_INDEX start: 1141340 length 156
- Stream: column 2 section ROW_INDEX start: 1141496 length 168
- Stream: column 3 section ROW_INDEX start: 1141664 length 1634
- Stream: column 1 section DATA start: 1143298 length 20035
- Stream: column 2 section DATA start: 1163333 length 40050
- Stream: column 3 section DATA start: 1203383 length 798014
- Stream: column 3 section LENGTH start: 2001397 length 5902
+ Stripe: offset: 1141300 data: 863962 rows: 5000 tail: 69 index: 1975
+ Stream: column 0 section ROW_INDEX start: 1141300 length 17
+ Stream: column 1 section ROW_INDEX start: 1141317 length 156
+ Stream: column 2 section ROW_INDEX start: 1141473 length 168
+ Stream: column 3 section ROW_INDEX start: 1141641 length 1634
+ Stream: column 1 section DATA start: 1143275 length 20035
+ Stream: column 2 section DATA start: 1163310 length 40050
+ Stream: column 3 section DATA start: 1203360 length 798014
+ Stream: column 3 section LENGTH start: 2001374 length 5863
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
@@ -163,15 +163,15 @@ Stripes:
Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9
650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878 sum: 568274 positions: 286457,302,0,1926,462
Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-91
28-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788 sum: 594578 positions: 447943,3328,0,3444,250
Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8
390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904 sum: 631944 positions: 616471,3986,3778,547,292
- Stripe: offset: 2007368 data: 207295 rows: 1000 tail: 67 index: 841
- Stream: column 0 section ROW_INDEX start: 2007368 length 12
- Stream: column 1 section ROW_INDEX start: 2007380 length 38
- Stream: column 2 section ROW_INDEX start: 2007418 length 41
- Stream: column 3 section ROW_INDEX start: 2007459 length 750
- Stream: column 1 section DATA start: 2008209 length 4007
- Stream: column 2 section DATA start: 2012216 length 8010
- Stream: column 3 section DATA start: 2020226 length 194018
- Stream: column 3 section LENGTH start: 2214244 length 1260
+ Stripe: offset: 2007306 data: 207282 rows: 1000 tail: 67 index: 841
+ Stream: column 0 section ROW_INDEX start: 2007306 length 12
+ Stream: column 1 section ROW_INDEX start: 2007318 length 38
+ Stream: column 2 section ROW_INDEX start: 2007356 length 41
+ Stream: column 3 section ROW_INDEX start: 2007397 length 750
+ Stream: column 1 section DATA start: 2008147 length 4007
+ Stream: column 2 section DATA start: 2012154 length 8010
+ Stream: column 3 section DATA start: 2020164 length 194018
+ Stream: column 3 section LENGTH start: 2214182 length 1247
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
@@ -183,7 +183,7 @@ Stripes:
Row group indices for column 3:
Entry 0: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164-19348-19400-19546-19776-19896-20084 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-
7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 670762 positions: 0,0,0,0,0
-File length: 2217685 bytes
+File length: 2217611 bytes
Padding length: 0 bytes
Padding ratio: 0%
________________________________________________________________________________________________________________________
[2/2] orc git commit: ORC-119. Create an API to separate out layout
from the writer. (omalley)
Posted by om...@apache.org.
ORC-119. Create an API to separate out layout from the writer. (omalley)
Fixes #76
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ca7c97eb
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ca7c97eb
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ca7c97eb
Branch: refs/heads/master
Commit: ca7c97ebb7e5e97a9bc45894d377baa09f637e27
Parents: 955190f
Author: Owen O'Malley <om...@apache.org>
Authored: Fri Dec 9 13:30:56 2016 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jan 16 15:33:42 2017 -0800
----------------------------------------------------------------------
java/core/src/java/org/apache/orc/OrcFile.java | 18 +
.../src/java/org/apache/orc/PhysicalWriter.java | 133 ++++
.../org/apache/orc/impl/BitFieldWriter.java | 4 +
.../java/org/apache/orc/impl/IntegerWriter.java | 6 +
.../src/java/org/apache/orc/impl/OutStream.java | 52 +-
.../org/apache/orc/impl/PhysicalFsWriter.java | 386 ++++++++++
.../apache/orc/impl/RunLengthByteWriter.java | 4 +
.../apache/orc/impl/RunLengthIntegerWriter.java | 4 +
.../orc/impl/RunLengthIntegerWriterV2.java | 5 +
.../java/org/apache/orc/impl/WriterImpl.java | 740 ++++++++-----------
.../test/org/apache/orc/TestVectorOrcFile.java | 4 +-
.../test/org/apache/orc/impl/TestInStream.java | 8 +-
.../test/org/apache/orc/impl/TestOutStream.java | 5 +-
.../orc-file-dump-dictionary-threshold.out | 78 +-
14 files changed, 923 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 7551c2e..68e49f3 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -298,6 +298,7 @@ public class OrcFile {
private String bloomFilterColumns;
private double bloomFilterFpp;
private BloomFilterVersion bloomFilterVersion;
+ private PhysicalWriter physicalWriter;
protected WriterOptions(Properties tableProperties, Configuration conf) {
configuration = conf;
@@ -487,6 +488,19 @@ public class OrcFile {
}
/**
+ * Change the physical writer of the ORC file.
+ *
+ * SHOULD ONLY BE USED BY LLAP.
+ *
+ * @param writer the writer to control the layout and persistence
+ * @return this
+ */
+ public WriterOptions physicalWriter(PhysicalWriter writer) {
+ this.physicalWriter = writer;
+ return this;
+ }
+
+ /**
* A package local option to set the memory manager.
*/
protected WriterOptions memory(MemoryManager value) {
@@ -569,6 +583,10 @@ public class OrcFile {
public BloomFilterVersion getBloomFilterVersion() {
return bloomFilterVersion;
}
+
+ public PhysicalWriter getPhysicalWriter() {
+ return physicalWriter;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
new file mode 100644
index 0000000..9953d41
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.orc.impl.StreamName;
+
+/**
+ * This interface separates the physical layout of ORC files from the higher
+ * level details.
+ *
+ * This API is limited to being used by LLAP.
+ */
+public interface PhysicalWriter {
+
+ /**
+ * The target of an output stream.
+ */
+ interface OutputReceiver {
+ /**
+ * Output the given buffer to the final destination
+ *
+ * @param buffer the buffer to output
+ * @throws IOException
+ */
+ void output(ByteBuffer buffer) throws IOException;
+
+ /**
+ * Suppress this stream from being written to the stripe.
+ */
+ void suppress();
+ }
+ /**
+ * Writes the header of the file, which consists of the magic "ORC" bytes.
+ * @throws IOException
+ */
+ void writeHeader() throws IOException;
+
+ /**
+ * Create an OutputReceiver for the given name.
+ * @param name the name of the stream
+ * @throws IOException
+ */
+ OutputReceiver createDataStream(StreamName name) throws IOException;
+
+ /**
+ * Write an index in the given stream name.
+ * @param name the name of the stream
+ * @param index the bloom filter to write
+ * @param codec the compression codec to use
+ */
+ void writeIndex(StreamName name,
+ OrcProto.RowIndex.Builder index,
+ CompressionCodec codec) throws IOException;
+
+ /**
+ * Write a bloom filter index in the given stream name.
+ * @param name the name of the stream
+ * @param bloom the bloom filter to write
+ * @param codec the compression codec to use
+ */
+ void writeBloomFilter(StreamName name,
+ OrcProto.BloomFilterIndex.Builder bloom,
+ CompressionCodec codec) throws IOException;
+
+ /**
+ * Flushes the data in all the streams, spills them to disk, write out stripe
+ * footer.
+ * @param footer Stripe footer to be updated with relevant data and written out.
+ * @param dirEntry File metadata entry for the stripe, to be updated with
+ * relevant data.
+ */
+ void finalizeStripe(OrcProto.StripeFooter.Builder footer,
+ OrcProto.StripeInformation.Builder dirEntry) throws IOException;
+
+ /**
+ * Writes out the file metadata.
+ * @param builder Metadata builder to finalize and write.
+ */
+ void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException;
+
+ /**
+ * Writes out the file footer.
+ * @param builder Footer builder to finalize and write.
+ */
+ void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException;
+
+ /**
+ * Writes out the postscript (including the size byte if needed).
+ * @param builder Postscript builder to finalize and write.
+ */
+ long writePostScript(OrcProto.PostScript.Builder builder) throws IOException;
+
+ /**
+ * Closes the writer.
+ */
+ void close() throws IOException;
+
+ /**
+ * Flushes the writer so that readers can see the preceeding postscripts.
+ */
+ void flush() throws IOException;
+
+ /**
+ * Appends raw stripe data (e.g. for file merger).
+ * @param stripe Stripe data buffer.
+ * @param dirEntry File metadata entry for the stripe, to be updated with
+ * relevant data.
+ * @throws IOException
+ */
+ void appendRawStripe(ByteBuffer stripe,
+ OrcProto.StripeInformation.Builder dirEntry
+ ) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
index aa5f886..3c8070f 100644
--- a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
@@ -70,4 +70,8 @@ public class BitFieldWriter {
output.getPosition(recorder);
recorder.addPosition(8 - bitsLeft);
}
+
+ public long estimateMemory() {
+ return output.estimateMemory();
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
index 419054f..70b16d3 100644
--- a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -44,4 +44,10 @@ public interface IntegerWriter {
* @throws IOException
*/
void flush() throws IOException;
+
+ /**
+ * Estimate the amount of memory being used.
+ * @return number of bytes
+ */
+ long estimateMemory();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index 81662cc..a1131e4 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -18,26 +18,16 @@
package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
public class OutStream extends PositionedOutputStream {
- public interface OutputReceiver {
- /**
- * Output the given buffer to the final destination
- * @param buffer the buffer to output
- * @throws IOException
- */
- void output(ByteBuffer buffer) throws IOException;
- }
-
public static final int HEADER_SIZE = 3;
private final String name;
- private final OutputReceiver receiver;
- // if enabled the stream will be suppressed when writing stripe
- private boolean suppress;
+ private final PhysicalWriter.OutputReceiver receiver;
/**
* Stores the uncompressed bytes that have been serialized, but not
@@ -69,17 +59,15 @@ public class OutStream extends PositionedOutputStream {
public OutStream(String name,
int bufferSize,
CompressionCodec codec,
- OutputReceiver receiver) throws IOException {
+ PhysicalWriter.OutputReceiver receiver) throws IOException {
this.name = name;
this.bufferSize = bufferSize;
this.codec = codec;
this.receiver = receiver;
- this.suppress = false;
}
public void clear() throws IOException {
flush();
- suppress = false;
}
/**
@@ -258,32 +246,28 @@ public class OutStream extends PositionedOutputStream {
@Override
public long getBufferSize() {
- long result = 0;
- if (current != null) {
- result += current.capacity();
- }
- if (compressed != null) {
- result += compressed.capacity();
- }
- if (overflow != null) {
- result += overflow.capacity();
+ if (codec == null) {
+ return uncompressedBytes + (current == null ? 0 : current.remaining());
+ } else {
+ long result = 0;
+ if (current != null) {
+ result += current.capacity();
+ }
+ if (compressed != null) {
+ result += compressed.capacity();
+ }
+ if (overflow != null) {
+ result += overflow.capacity();
+ }
+ return result + compressedBytes;
}
- return result;
}
/**
* Set suppress flag
*/
public void suppress() {
- suppress = true;
- }
-
- /**
- * Returns the state of suppress flag
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return suppress;
+ receiver.suppress();
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..48a0b42
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PhysicalFsWriter implements PhysicalWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ private final FSDataOutputStream rawWriter;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+
+ private final Path path;
+ private final long blockSize;
+ private final int bufferSize;
+ private final double paddingTolerance;
+ private final long defaultStripeSize;
+ private final CompressionKind compress;
+ private final boolean addBlockPadding;
+
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<>();
+
+ private long adjustedStripeSize;
+ private long headerLength;
+ private long stripeStart;
+ private int metadataLength;
+ private int footerLength;
+
+ public PhysicalFsWriter(FileSystem fs,
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.path = path;
+ this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+ this.addBlockPadding = opts.getBlockPadding();
+ if (opts.isEnforceBufferSize()) {
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize = WriterImpl.getEstimatedBufferSize(defaultStripeSize,
+ opts.getSchema().getMaximumId() + 1,
+ opts.getBufferSize());
+ }
+ this.compress = opts.getCompress();
+ this.paddingTolerance = opts.getPaddingTolerance();
+ this.blockSize = opts.getBlockSize();
+ LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+ " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+ compress, bufferSize);
+ rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(path), blockSize);
+ CompressionCodec codec = WriterImpl.createCodec(compress);
+ writer = new OutStream("metadata", bufferSize, codec,
+ new DirectStream(rawWriter));
+ protobufWriter = CodedOutputStream.newInstance(writer);
+ }
+
+ private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+ this.stripeStart = rawWriter.getPos();
+ final long currentStripeSize = indexSize + dataSize + footerSize;
+ final long available = blockSize - (stripeStart % blockSize);
+ final long overflow = currentStripeSize - adjustedStripeSize;
+ final float availRatio = (float) available / (float) defaultStripeSize;
+
+ if (availRatio > 0.0f && availRatio < 1.0f
+ && availRatio > paddingTolerance) {
+ // adjust default stripe size to fit into remaining space, also adjust
+ // the next stripe for correction based on the current stripe size
+ // and user specified padding tolerance. Since stripe size can overflow
+ // the default stripe size we should apply this correction to avoid
+ // writing portion of last stripe to next hdfs block.
+ double correction = overflow > 0 ? (double) overflow
+ / (double) adjustedStripeSize : 0.0;
+
+ // correction should not be greater than user specified padding
+ // tolerance
+ correction = correction > paddingTolerance ? paddingTolerance
+ : correction;
+
+ // adjust next stripe size based on current stripe estimate correction
+ adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+ } else if (availRatio >= 1.0) {
+ adjustedStripeSize = defaultStripeSize;
+ }
+
+ if (availRatio < paddingTolerance && addBlockPadding) {
+ long padding = blockSize - (stripeStart % blockSize);
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+ LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
+ padding, availRatio, defaultStripeSize));
+ stripeStart += padding;
+ while (padding > 0) {
+ int writeLen = (int) Math.min(padding, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ padding -= writeLen;
+ }
+ adjustedStripeSize = defaultStripeSize;
+ } else if (currentStripeSize < blockSize
+ && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+ // even if you don't pad, reset the default stripe size when crossing a
+ // block boundary
+ adjustedStripeSize = defaultStripeSize;
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+
+ @Override
+ public void suppress() {
+ throw new UnsupportedOperationException("Can't suppress direct stream");
+ }
+ }
+
+ private void writeStripeFooter(OrcProto.StripeFooter footer,
+ long dataSize,
+ long indexSize,
+ OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException {
+ long startPosn = rawWriter.getPos();
+ OrcProto.Metadata metadata = builder.build();
+ metadata.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException {
+ long bodyLength = rawWriter.getPos() - metadataLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosn = rawWriter.getPos();
+ OrcProto.Footer footer = builder.build();
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.footerLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ rawWriter.writeByte((int)length);
+ return rawWriter.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawWriter.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ rawWriter.hflush();
+ }
+
+ @Override
+ public void appendRawStripe(ByteBuffer buffer,
+ OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ long start = rawWriter.getPos();
+ int length = buffer.remaining();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace &&
+ addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while merging..",
+ availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+ rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ length);
+ dirEntry.setOffset(start);
+ }
+
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private static final class BufferedStream implements OutputReceiver {
+ private boolean isSuppressed = false;
+ private final List<ByteBuffer> output = new ArrayList<>();
+
+ @Override
+ public void output(ByteBuffer buffer) {
+ if (!isSuppressed) {
+ output.add(buffer);
+ }
+ }
+
+ public void suppress() {
+ isSuppressed = true;
+ output.clear();
+ }
+
+ /**
+ * Write any saved buffers to the OutputStream if needed, and clears all the
+ * buffers.
+ */
+ void spillToDiskAndClear(FSDataOutputStream raw
+ ) throws IOException {
+ if (!isSuppressed) {
+ for (ByteBuffer buffer: output) {
+ raw.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ output.clear();
+ }
+ isSuppressed = false;
+ }
+
+ /**
+ * Get the number of bytes that will be written to the output.
+ *
+ * Assumes the stream writing into this receiver has already been flushed.
+ * @return number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+ }
+
+ @Override
+ public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
+ OrcProto.StripeInformation.Builder dirEntry
+ ) throws IOException {
+ long indexSize = 0;
+ long dataSize = 0;
+ for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+ BufferedStream receiver = pair.getValue();
+ if (!receiver.isSuppressed) {
+ long streamSize = receiver.getOutputSize();
+ StreamName name = pair.getKey();
+ footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+ .setKind(name.getKind()).setLength(streamSize));
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
+ }
+ }
+ }
+ dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+ OrcProto.StripeFooter footer = footerBuilder.build();
+ // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+ padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+ // write out the data streams
+ for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ pair.getValue().spillToDiskAndClear(rawWriter);
+ }
+ // Write out the footer.
+ writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+ }
+
+ @Override
+ public void writeHeader() throws IOException {
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ }
+
+ @Override
+ public BufferedStream createDataStream(StreamName name) {
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream();
+ streams.put(name, result);
+ }
+ return result;
+ }
+
+ @Override
+ public void writeIndex(StreamName name,
+ OrcProto.RowIndex.Builder index,
+ CompressionCodec codec) throws IOException {
+ OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
+ createDataStream(name));
+ index.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilter(StreamName name,
+ OrcProto.BloomFilterIndex.Builder bloom,
+ CompressionCodec codec) throws IOException {
+ OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
+ createDataStream(name));
+ bloom.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
index 09108b2..c2f1fa7 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java
@@ -103,4 +103,8 @@ public class RunLengthByteWriter {
output.getPosition(recorder);
recorder.addPosition(numLiterals);
}
+
+ public long estimateMemory() {
+ return output.getBufferSize() + MAX_LITERAL_SIZE;
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
index 3e5f2e2..88b47e6 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
@@ -140,4 +140,8 @@ public class RunLengthIntegerWriter implements IntegerWriter {
recorder.addPosition(numLiterals);
}
+ @Override
+ public long estimateMemory() {
+ return output.getBufferSize();
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
index 183fd8e..29cbebf 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
@@ -818,4 +818,9 @@ public class RunLengthIntegerWriterV2 implements IntegerWriter {
output.getPosition(recorder);
recorder.addPosition(numLiterals);
}
+
+ @Override
+ public long estimateMemory() {
+ return output.getBufferSize();
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 22ff867..c364ca0 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -19,7 +19,6 @@
package org.apache.orc.impl;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
@@ -39,23 +38,23 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.orc.BinaryColumnStatistics;
import org.apache.orc.ColumnStatistics;
-import org.apache.orc.util.BloomFilter;
-import org.apache.orc.util.BloomFilterIO;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcUtils;
+import org.apache.orc.PhysicalWriter;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
import org.apache.orc.util.BloomFilterUtf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -73,7 +72,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.Text;
import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -98,40 +96,28 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
- private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
// threshold above which buffer size will be automatically resized
private static final int COLUMN_COUNT_THRESHOLD = 1000;
- private final FileSystem fs;
private final Path path;
private final long defaultStripeSize;
private long adjustedStripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
private final CompressionCodec codec;
- private final boolean addBlockPadding;
private final int bufferSize;
private final long blockSize;
- private final double paddingTolerance;
private final TypeDescription schema;
+ private final PhysicalWriter physicalWriter;
- // the streams that make up the current stripe
- private final Map<StreamName, BufferedStream> streams =
- new TreeMap<StreamName, BufferedStream>();
-
- private FSDataOutputStream rawWriter = null;
- // the compressed metadata information outStream
- private OutStream writer = null;
- // a protobuf outStream around streamFactory
- private CodedOutputStream protobufWriter = null;
- private long headerLength;
private int columnCount;
private long rowCount = 0;
private long rowsInStripe = 0;
private long rawDataSize = 0;
private int rowsInIndex = 0;
+ private long lastFlushOffset = 0;
private int stripesAtLastFlush = -1;
private final List<OrcProto.StripeInformation> stripes =
new ArrayList<OrcProto.StripeInformation>();
@@ -155,7 +141,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public WriterImpl(FileSystem fs,
Path path,
OrcFile.WriterOptions opts) throws IOException {
- this.fs = fs;
this.path = path;
this.conf = opts.getConfiguration();
this.callback = opts.getCallback();
@@ -177,9 +162,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
- this.addBlockPadding = opts.getBlockPadding();
this.blockSize = opts.getBlockSize();
- this.paddingTolerance = opts.getPaddingTolerance();
this.compress = opts.getCompress();
this.rowIndexStride = opts.getRowIndexStride();
this.memoryManager = opts.getMemoryManager();
@@ -200,6 +183,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
}
this.bloomFilterFpp = opts.getBloomFilterFpp();
+ this.physicalWriter = opts.getPhysicalWriter() == null ?
+ new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
+ physicalWriter.writeHeader();
treeWriter = createTreeWriter(schema, streamFactory, false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
@@ -273,10 +259,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
@Override
public boolean checkMemory(double newScale) throws IOException {
long limit = (long) Math.round(adjustedStripeSize * newScale);
- long size = estimateStripeSize();
+ long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
- LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
- limit);
+ LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+ " limit = " + limit);
}
if (size > limit) {
flushStripe();
@@ -285,116 +271,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return false;
}
- /**
- * This class is used to hold the contents of streams as they are buffered.
- * The TreeWriters write to the outStream and the codec compresses the
- * data as buffers fill up and stores them in the output list. When the
- * stripe is being written, the whole stream is written to the file.
- */
- private class BufferedStream implements OutStream.OutputReceiver {
- private final OutStream outStream;
- private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
- BufferedStream(String name, int bufferSize,
- CompressionCodec codec) throws IOException {
- outStream = new OutStream(name, bufferSize, codec, this);
- }
-
- /**
- * Receive a buffer from the compression codec.
- * @param buffer the buffer to save
- */
- @Override
- public void output(ByteBuffer buffer) {
- output.add(buffer);
- }
-
- /**
- * Get the number of bytes in buffers that are allocated to this stream.
- * @return number of bytes in buffers
- */
- public long getBufferSize() {
- long result = 0;
- for(ByteBuffer buf: output) {
- result += buf.capacity();
- }
- return outStream.getBufferSize() + result;
- }
-
- /**
- * Flush the stream to the codec.
- * @throws IOException
- */
- public void flush() throws IOException {
- outStream.flush();
- }
-
- /**
- * Clear all of the buffers.
- * @throws IOException
- */
- public void clear() throws IOException {
- outStream.clear();
- output.clear();
- }
-
- /**
- * Check the state of suppress flag in output stream
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return outStream.isSuppressed();
- }
-
- /**
- * Get the number of bytes that will be written to the output. Assumes
- * the stream has already been flushed.
- * @return the number of bytes
- */
- public long getOutputSize() {
- long result = 0;
- for(ByteBuffer buffer: output) {
- result += buffer.remaining();
- }
- return result;
- }
-
- /**
- * Write the saved compressed buffers to the OutputStream.
- * @param out the stream to write to
- * @throws IOException
- */
- void spillTo(OutputStream out) throws IOException {
- for(ByteBuffer buffer: output) {
- out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- @Override
- public String toString() {
- return outStream.toString();
- }
- }
-
- /**
- * An output receiver that writes the ByteBuffers to the output stream
- * as they are received.
- */
- private class DirectStream implements OutStream.OutputReceiver {
- private final FSDataOutputStream output;
-
- DirectStream(FSDataOutputStream output) {
- this.output = output;
- }
-
- @Override
- public void output(ByteBuffer buffer) throws IOException {
- output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
private static class RowIndexPositionRecorder implements PositionRecorder {
private final OrcProto.RowIndexEntry.Builder builder;
@@ -408,35 +284,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- /**
- * Interface from the Writer to the TreeWriters. This limits the visibility
- * that the TreeWriters have into the Writer.
- */
- private class StreamFactory {
- /**
- * Create a stream to store part of a column.
- * @param column the column id for the stream
- * @param kind the kind of stream
- * @return The output outStream that the section needs to be written to.
- * @throws IOException
- */
- public OutStream createStream(int column,
- OrcProto.Stream.Kind kind
- ) throws IOException {
- final StreamName name = new StreamName(column, kind);
- final EnumSet<CompressionCodec.Modifier> modifiers;
-
+ CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+ CompressionCodec result = codec;
+ if (codec != null) {
switch (kind) {
case BLOOM_FILTER:
case DATA:
case DICTIONARY_DATA:
case BLOOM_FILTER_UTF8:
- if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
- modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
- CompressionCodec.Modifier.TEXT);
+ if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+ result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+ CompressionCodec.Modifier.TEXT));
} else {
- modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
- CompressionCodec.Modifier.TEXT);
+ result = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ CompressionCodec.Modifier.TEXT));
}
break;
case LENGTH:
@@ -445,22 +306,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
case ROW_INDEX:
case SECONDARY:
// easily compressed using the fastest modes
- modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
- CompressionCodec.Modifier.BINARY);
+ result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ CompressionCodec.Modifier.BINARY));
break;
default:
- LOG.warn("Missing ORC compression modifiers for " + kind);
- modifiers = null;
+ LOG.info("Missing ORC compression modifiers for " + kind);
break;
}
+ }
+ return result;
+ }
- BufferedStream result = streams.get(name);
- if (result == null) {
- result = new BufferedStream(name.toString(), bufferSize,
- codec == null ? codec : codec.modify(modifiers));
- streams.put(name, result);
- }
- return result.outStream;
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ * @throws IOException
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ CompressionCodec codec = getCustomizedCodec(kind);
+
+ return new OutStream(physicalWriter.toString(), bufferSize, codec,
+ physicalWriter.createDataStream(name));
}
/**
@@ -552,6 +428,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public OrcFile.BloomFilterVersion getBloomFilterVersion() {
return bloomFilterVersion;
}
+
+ public void writeIndex(StreamName name,
+ OrcProto.RowIndex.Builder index) throws IOException {
+ physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+ }
+
+ public void writeBloomFilter(StreamName name,
+ OrcProto.BloomFilterIndex.Builder bloom
+ ) throws IOException {
+ physicalWriter.writeBloomFilter(name, bloom,
+ getCustomizedCodec(name.getKind()));
+ }
}
/**
@@ -571,9 +459,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
protected final RowIndexPositionRecorder rowIndexPosition;
private final OrcProto.RowIndex.Builder rowIndex;
private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
- private final PositionedOutputStream rowIndexStream;
- private final PositionedOutputStream bloomFilterStream;
- private final PositionedOutputStream bloomFilterStreamUtf8;
protected final BloomFilter bloomFilter;
protected final BloomFilterUtf8 bloomFilterUtf8;
protected final boolean createBloomFilter;
@@ -613,39 +498,33 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
stripeColStatistics = ColumnStatisticsImpl.create(schema);
fileStatistics = ColumnStatisticsImpl.create(schema);
childrenWriters = new TreeWriter[0];
- rowIndex = OrcProto.RowIndex.newBuilder();
- rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
- rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
- stripeStatsBuilders = new ArrayList<>();
if (streamFactory.buildIndex()) {
- rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
} else {
- rowIndexStream = null;
+ rowIndex = null;
+ rowIndexEntry = null;
+ rowIndexPosition = null;
}
+ stripeStatsBuilders = new ArrayList<>();
if (createBloomFilter) {
bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
streamFactory.getBloomFilterFPP());
bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStream = streamFactory.createStream(id,
- OrcProto.Stream.Kind.BLOOM_FILTER);;
} else {
bloomFilter = null;
bloomFilterIndex = null;
- bloomFilterStream = null;
}
bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
streamFactory.getBloomFilterFPP());
bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStreamUtf8 = streamFactory.createStream(id,
- OrcProto.Stream.Kind.BLOOM_FILTER_UTF8);;
} else {
bloomFilterEntry = null;
bloomFilterIndex = null;
bloomFilterIndexUtf8 = null;
- bloomFilterStreamUtf8 = null;
- bloomFilterStream = null;
bloomFilter = null;
bloomFilterUtf8 = null;
}
@@ -779,7 +658,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
isPresentOutStream.suppress();
// since isPresent bitstream is suppressed, update the index to
// remove the positions of the isPresent stream
- if (rowIndexStream != null) {
+ if (rowIndex != null) {
removeIsPresentPositions();
}
}
@@ -798,28 +677,27 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (streamFactory.hasWriterTimeZone()) {
builder.setWriterTimezone(TimeZone.getDefault().getID());
}
- if (rowIndexStream != null) {
+ if (rowIndex != null) {
if (rowIndex.getEntryCount() != requiredIndexEntries) {
throw new IllegalArgumentException("Column has wrong number of " +
"index entries found: " + rowIndex.getEntryCount() + " expected: " +
requiredIndexEntries);
}
- rowIndex.build().writeTo(rowIndexStream);
- rowIndexStream.flush();
+ streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+ rowIndex.clear();
+ rowIndexEntry.clear();
}
- rowIndex.clear();
- rowIndexEntry.clear();
// write the bloom filter to out stream
- if (bloomFilterStream != null) {
- bloomFilterIndex.build().writeTo(bloomFilterStream);
- bloomFilterStream.flush();
+ if (bloomFilterIndex != null) {
+ streamFactory.writeBloomFilter(new StreamName(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
bloomFilterIndex.clear();
}
// write the bloom filter to out stream
- if (bloomFilterStreamUtf8 != null) {
- bloomFilterIndexUtf8.build().writeTo(bloomFilterStreamUtf8);
- bloomFilterStreamUtf8.flush();
+ if (bloomFilterIndexUtf8 != null) {
+ streamFactory.writeBloomFilter(new StreamName(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
bloomFilterIndexUtf8.clear();
}
}
@@ -899,6 +777,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
*/
long estimateMemory() {
long result = 0;
+ if (isPresent != null) {
+ result = isPresentOutStream.getBufferSize();
+ }
for (TreeWriter child: childrenWriters) {
result += child.estimateMemory();
}
@@ -917,7 +798,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.writer = new BitFieldWriter(out, 1);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -949,7 +832,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
writer.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -957,6 +842,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
writer.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
}
private static class ByteTreeWriter extends TreeWriter {
@@ -969,7 +859,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super(columnId, schema, writer, nullable);
this.writer = new RunLengthByteWriter(writer.createStream(id,
OrcProto.Stream.Kind.DATA));
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1013,7 +905,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
writer.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1021,6 +915,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
writer.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
}
private static class IntegerTreeWriter extends TreeWriter {
@@ -1036,7 +935,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1090,7 +991,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
writer.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1098,6 +1001,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
writer.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
}
private static class FloatTreeWriter extends TreeWriter {
@@ -1112,7 +1020,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1157,7 +1067,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
stream.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1165,6 +1077,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
stream.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize();
+ }
}
private static class DoubleTreeWriter extends TreeWriter {
@@ -1179,7 +1096,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1223,7 +1142,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
stream.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1231,18 +1152,22 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
stream.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize();
+ }
}
private static abstract class StringBaseTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final OutStream stringOutput;
- private final IntegerWriter lengthOutput;
+ protected final IntegerWriter lengthOutput;
private final IntegerWriter rowOutput;
protected final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
protected final DynamicIntArray rows = new DynamicIntArray();
protected final PositionedOutputStream directStreamOutput;
- protected final IntegerWriter directLengthOutput;
private final List<OrcProto.RowIndexEntry> savedRowIndex =
new ArrayList<OrcProto.RowIndexEntry>();
private final boolean buildIndex;
@@ -1261,18 +1186,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
+ directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
lengthOutput = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- rowOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
+ writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
- directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- directLengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
Configuration conf = writer.getConfiguration();
dictionaryKeySizeThreshold =
OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
@@ -1315,16 +1240,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// we need to build the rowindex before calling super, since it
// writes it out.
super.writeStripe(builder, requiredIndexEntries);
- stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
- directStreamOutput.flush();
- directLengthOutput.flush();
+ if (useDictionaryEncoding) {
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ } else {
+ directStreamOutput.flush();
+ lengthOutput.flush();
+ }
// reset all of the fields to be ready for the next stripe.
dictionary.clear();
savedRowIndex.clear();
rowIndexValueCount.clear();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
rowIndexValueCount.add(0L);
if (!useDictionaryEncoding) {
@@ -1374,7 +1304,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
PositionRecorder posn = new RowIndexPositionRecorder(base);
directStreamOutput.getPosition(posn);
- directLengthOutput.getPosition(posn);
+ lengthOutput.getPosition(posn);
}
rowIndex.addEntry(base.build());
}
@@ -1385,7 +1315,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
dictionary.getText(text, rows.get(i));
directStreamOutput.write(text.getBytes(), 0, text.getLength());
- directLengthOutput.write(text.getLength());
+ lengthOutput.write(text.getLength());
}
}
}
@@ -1450,13 +1380,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
private void recordDirectStreamPosition() throws IOException {
- directStreamOutput.getPosition(rowIndexPosition);
- directLengthOutput.getPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ directStreamOutput.getPosition(rowIndexPosition);
+ lengthOutput.getPosition(rowIndexPosition);
+ }
}
@Override
long estimateMemory() {
- return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+ long parent = super.estimateMemory();
+ if (useDictionaryEncoding) {
+ return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
+ } else {
+ return parent + lengthOutput.estimateMemory() +
+ directStreamOutput.getBufferSize();
+ }
}
}
@@ -1484,7 +1422,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(int i=0; i < length; ++i) {
directStreamOutput.write(vec.vector[0], vec.start[0],
vec.length[0]);
- directLengthOutput.write(vec.length[0]);
+ lengthOutput.write(vec.length[0]);
}
}
indexStatistics.updateString(vec.vector[0], vec.start[0],
@@ -1507,7 +1445,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
directStreamOutput.write(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i]);
- directLengthOutput.write(vec.length[offset + i]);
+ lengthOutput.write(vec.length[offset + i]);
}
indexStatistics.updateString(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i], 1);
@@ -1570,7 +1508,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
for(int i=0; i < length; ++i) {
directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
+ lengthOutput.write(itemLength);
}
}
indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
@@ -1603,7 +1541,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rows.add(dictionary.add(ptr, ptrOffset, itemLength));
} else {
directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
+ lengthOutput.write(itemLength);
}
indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
if (createBloomFilter) {
@@ -1653,7 +1591,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(int i=0; i < length; ++i) {
directStreamOutput.write(vec.vector[0], vec.start[0],
itemLength);
- directLengthOutput.write(itemLength);
+ lengthOutput.write(itemLength);
}
}
indexStatistics.updateString(vec.vector[0], vec.start[0],
@@ -1679,7 +1617,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
directStreamOutput.write(vec.vector[offset + i],
vec.start[offset + i], itemLength);
- directLengthOutput.write(itemLength);
+ lengthOutput.write(itemLength);
}
indexStatistics.updateString(vec.vector[offset + i],
vec.start[offset + i], itemLength, 1);
@@ -1714,7 +1652,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.isDirectV2 = isNewWriteFormat(writer);
this.length = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1776,7 +1716,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.writeStripe(builder, requiredIndexEntries);
stream.flush();
length.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1785,6 +1727,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
stream.getPosition(recorder);
length.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize() +
+ length.estimateMemory();
+ }
}
public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
@@ -1809,7 +1757,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
this.nanos = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
// for unit tests to set different time zones
this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
writer.useWriterTimeZone(true);
@@ -1875,7 +1825,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.writeStripe(builder, requiredIndexEntries);
seconds.flush();
nanos.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
private static long formatNanos(int nanos) {
@@ -1900,6 +1852,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
seconds.getPosition(recorder);
nanos.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + seconds.estimateMemory() +
+ nanos.estimateMemory();
+ }
}
private static class DateTreeWriter extends TreeWriter {
@@ -1915,7 +1873,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcProto.Stream.Kind.DATA);
this.isDirectV2 = isNewWriteFormat(writer);
this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1959,7 +1919,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
writer.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -1977,6 +1939,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return OrcProto.ColumnEncoding.newBuilder()
.setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
}
private static class DecimalTreeWriter extends TreeWriter {
@@ -2000,7 +1967,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
this.scaleStream = createIntegerWriter(writer.createStream(id,
OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2060,7 +2029,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.writeStripe(builder, requiredIndexEntries);
valueStream.flush();
scaleStream.flush();
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2069,6 +2040,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
valueStream.getPosition(recorder);
scaleStream.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + valueStream.getBufferSize() +
+ scaleStream.estimateMemory();
+ }
}
private static class StructTreeWriter extends TreeWriter {
@@ -2084,7 +2061,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
children.get(i), writer,
true);
}
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2148,7 +2127,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
}
@@ -2167,7 +2148,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
createTreeWriter(schema.getChildren().get(0), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2243,7 +2226,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2251,6 +2236,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
lengths.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory();
+ }
}
private static class MapTreeWriter extends TreeWriter {
@@ -2271,7 +2261,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
createTreeWriter(children.get(1), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2352,7 +2344,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2360,6 +2354,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
lengths.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory();
+ }
}
private static class UnionTreeWriter extends TreeWriter {
@@ -2379,7 +2378,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
tags =
new RunLengthByteWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.DATA));
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2451,7 +2452,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
for(TreeWriter child: childrenWriters) {
child.writeStripe(builder, requiredIndexEntries);
}
- recordPosition(rowIndexPosition);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
}
@Override
@@ -2459,6 +2462,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.recordPosition(recorder);
tags.getPosition(recorder);
}
+
+ @Override
+ long estimateMemory() {
+ return super.estimateMemory() + tags.estimateMemory();
+ }
}
private static TreeWriter createTreeWriter(TypeDescription schema,
@@ -2609,27 +2617,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- // @VisibleForTesting
- public FSDataOutputStream getStream() throws IOException {
- if (rawWriter == null) {
- rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
- fs.getDefaultReplication(path), blockSize);
- rawWriter.writeBytes(OrcFile.MAGIC);
- headerLength = rawWriter.getPos();
- writer = new OutStream("metadata", bufferSize, codec,
- new DirectStream(rawWriter));
- protobufWriter = CodedOutputStream.newInstance(writer);
- }
- return rawWriter;
- }
-
private void createRowIndexEntry() throws IOException {
treeWriter.createRowIndexEntry();
rowsInIndex = 0;
}
private void flushStripe() throws IOException {
- getStream();
if (buildIndex && rowsInIndex != 0) {
createRowIndexEntry();
}
@@ -2643,95 +2636,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcProto.StripeFooter.Builder builder =
OrcProto.StripeFooter.newBuilder();
treeWriter.writeStripe(builder, requiredIndexEntries);
- long indexSize = 0;
- long dataSize = 0;
- for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
- BufferedStream stream = pair.getValue();
- if (!stream.isSuppressed()) {
- stream.flush();
- StreamName name = pair.getKey();
- long streamSize = pair.getValue().getOutputSize();
- builder.addStreams(OrcProto.Stream.newBuilder()
- .setColumn(name.getColumn())
- .setKind(name.getKind())
- .setLength(streamSize));
- if (StreamName.Area.INDEX == name.getArea()) {
- indexSize += streamSize;
- } else {
- dataSize += streamSize;
- }
- }
- }
- OrcProto.StripeFooter footer = builder.build();
-
- // Do we need to pad the file so the stripe doesn't straddle a block
- // boundary?
- long start = rawWriter.getPos();
- final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
- final long available = blockSize - (start % blockSize);
- final long overflow = currentStripeSize - adjustedStripeSize;
- final float availRatio = (float) available / (float) defaultStripeSize;
-
- if (availRatio > 0.0f && availRatio < 1.0f
- && availRatio > paddingTolerance) {
- // adjust default stripe size to fit into remaining space, also adjust
- // the next stripe for correction based on the current stripe size
- // and user specified padding tolerance. Since stripe size can overflow
- // the default stripe size we should apply this correction to avoid
- // writing portion of last stripe to next hdfs block.
- double correction = overflow > 0 ? (double) overflow
- / (double) adjustedStripeSize : 0.0;
-
- // correction should not be greater than user specified padding
- // tolerance
- correction = correction > paddingTolerance ? paddingTolerance
- : correction;
-
- // adjust next stripe size based on current stripe estimate correction
- adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
- } else if (availRatio >= 1.0) {
- adjustedStripeSize = defaultStripeSize;
- }
-
- if (availRatio < paddingTolerance && addBlockPadding) {
- long padding = blockSize - (start % blockSize);
- byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
- LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
- padding, availRatio, defaultStripeSize));
- start += padding;
- while (padding > 0) {
- int writeLen = (int) Math.min(padding, pad.length);
- rawWriter.write(pad, 0, writeLen);
- padding -= writeLen;
- }
- adjustedStripeSize = defaultStripeSize;
- } else if (currentStripeSize < blockSize
- && (start % blockSize) + currentStripeSize > blockSize) {
- // even if you don't pad, reset the default stripe size when crossing a
- // block boundary
- adjustedStripeSize = defaultStripeSize;
- }
-
- // write out the data streams
- for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
- BufferedStream stream = pair.getValue();
- if (!stream.isSuppressed()) {
- stream.spillTo(rawWriter);
- }
- stream.clear();
- }
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
- OrcProto.StripeInformation dirEntry =
+ OrcProto.StripeInformation.Builder dirEntry =
OrcProto.StripeInformation.newBuilder()
- .setOffset(start)
- .setNumberOfRows(rowsInStripe)
- .setIndexLength(indexSize)
- .setDataLength(dataSize)
- .setFooterLength(footerLength).build();
- stripes.add(dirEntry);
+ .setNumberOfRows(rowsInStripe);
+ physicalWriter.finalizeStripe(builder, dirEntry);
+ stripes.add(dirEntry.build());
rowCount += rowsInStripe;
rowsInStripe = 0;
}
@@ -2812,30 +2721,33 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- private int writeMetadata() throws IOException {
- getStream();
+ private void writeMetadata() throws IOException {
OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
builder.addStripeStats(ssb.build());
}
+ physicalWriter.writeFileMetadata(builder);
+ }
- long startPosn = rawWriter.getPos();
- OrcProto.Metadata metadata = builder.build();
- metadata.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- return (int) (rawWriter.getPos() - startPosn);
+ private long writePostScript() throws IOException {
+ OrcProto.PostScript.Builder builder =
+ OrcProto.PostScript.newBuilder()
+ .setCompression(writeCompressionKind(compress))
+ .setMagic(OrcFile.MAGIC)
+ .addVersion(version.getMajor())
+ .addVersion(version.getMinor())
+ .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ return physicalWriter.writePostScript(builder);
}
- private int writeFooter(long bodyLength) throws IOException {
- getStream();
+ private long writeFooter() throws IOException {
+ writeMetadata();
OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
- builder.setContentLength(bodyLength);
- builder.setHeaderLength(headerLength);
builder.setNumberOfRows(rowCount);
builder.setRowIndexStride(rowIndexStride);
- // populate raw data size
- rawDataSize = computeRawDataSize();
// serialize the types
writeTypes(builder, schema);
// add the stripe information
@@ -2849,45 +2761,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
.setName(entry.getKey()).setValue(entry.getValue()));
}
- long startPosn = rawWriter.getPos();
- OrcProto.Footer footer = builder.build();
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- return (int) (rawWriter.getPos() - startPosn);
- }
-
- private int writePostScript(int footerLength, int metadataLength) throws IOException {
- OrcProto.PostScript.Builder builder =
- OrcProto.PostScript.newBuilder()
- .setCompression(writeCompressionKind(compress))
- .setFooterLength(footerLength)
- .setMetadataLength(metadataLength)
- .setMagic(OrcFile.MAGIC)
- .addVersion(version.getMajor())
- .addVersion(version.getMinor())
- .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
- if (compress != CompressionKind.NONE) {
- builder.setCompressionBlockSize(bufferSize);
- }
- OrcProto.PostScript ps = builder.build();
- // need to write this uncompressed
- long startPosn = rawWriter.getPos();
- ps.writeTo(rawWriter);
- long length = rawWriter.getPos() - startPosn;
- if (length > 255) {
- throw new IllegalArgumentException("PostScript too large at " + length);
- }
- return (int) length;
- }
-
- private long estimateStripeSize() {
- long result = 0;
- for(BufferedStream stream: streams.values()) {
- result += stream.getBufferSize();
- }
- result += treeWriter.estimateMemory();
- return result;
+ physicalWriter.writeFileFooter(builder);
+ return writePostScript();
}
@Override
@@ -2933,11 +2808,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
memoryManager.removeWriter(path);
// actually close the file
flushStripe();
- int metadataLength = writeMetadata();
- int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
- rawWriter.writeByte(writePostScript(footerLength, metadataLength));
- rawWriter.close();
-
+ lastFlushOffset = writeFooter();
+ physicalWriter.close();
}
/**
@@ -2967,13 +2839,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (callback != null) {
callback.preFooterWrite(callbackContext);
}
- int metaLength = writeMetadata();
- int footLength = writeFooter(rawWriter.getPos() - metaLength);
- rawWriter.writeByte(writePostScript(footLength, metaLength));
+ lastFlushOffset = writeFooter();
stripesAtLastFlush = stripes.size();
- rawWriter.hflush();
+ physicalWriter.flush();
}
- return rawWriter.getPos();
+ return lastFlushOffset;
}
static void checkArgument(boolean expression, String message) {
@@ -2993,28 +2863,15 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
checkArgument(stripeStatistics != null,
"Stripe statistics must not be null");
- getStream();
- long start = rawWriter.getPos();
- long availBlockSpace = blockSize - (start % blockSize);
-
- // see if stripe can fit in the current hdfs block, else pad the remaining
- // space in the block
- if (length < blockSize && length > availBlockSpace &&
- addBlockPadding) {
- byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
- LOG.info(String.format("Padding ORC by %d bytes while merging..",
- availBlockSpace));
- start += availBlockSpace;
- while (availBlockSpace > 0) {
- int writeLen = (int) Math.min(availBlockSpace, pad.length);
- rawWriter.write(pad, 0, writeLen);
- availBlockSpace -= writeLen;
- }
- }
-
- rawWriter.write(stripe);
- rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
- rowCount += rowsInStripe;
+ // update stripe information
+ OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+ .newBuilder()
+ .setNumberOfRows(rowsInStripe)
+ .setIndexLength(stripeInfo.getIndexLength())
+ .setDataLength(stripeInfo.getDataLength())
+ .setFooterLength(stripeInfo.getFooterLength());
+ physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length),
+ dirEntry);
// since we have already written the stripe, just update stripe statistics
treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
@@ -3022,16 +2879,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// update file level statistics
updateFileStatistics(stripeStatistics);
- // update stripe information
- OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
- .newBuilder()
- .setOffset(start)
- .setNumberOfRows(rowsInStripe)
- .setIndexLength(stripeInfo.getIndexLength())
- .setDataLength(stripeInfo.getDataLength())
- .setFooterLength(stripeInfo.getFooterLength())
- .build();
- stripes.add(dirEntry);
+ stripes.add(dirEntry.build());
// reset it after writing the stripe
rowsInStripe = 0;
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 2448cb7..7df521d 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -2117,8 +2117,8 @@ public class TestVectorOrcFile {
int i = 0;
for(StripeInformation stripe: reader.getStripes()) {
i += 1;
- assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(),
- stripe.getDataLength() < 5000);
+ assertTrue(testFilePath + " stripe " + i + " is too long at " +
+ stripe.getDataLength(), stripe.getDataLength() < 5000);
}
// with HIVE-7832, the dictionaries will be disabled after writing the first
// stripe as there are too many distinct values. Hence only 3 stripes as
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/impl/TestInStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index eea2ab2..d40676c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -31,11 +31,12 @@ import java.util.List;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
import org.junit.Test;
public class TestInStream {
- static class OutputCollector implements OutStream.OutputReceiver {
+ static class OutputCollector implements PhysicalWriter.OutputReceiver {
DynamicByteArray buffer = new DynamicByteArray();
@Override
@@ -43,6 +44,11 @@ public class TestInStream {
this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
}
+
+ @Override
+ public void suppress() {
+ // PASS
+ }
}
static class PositionCollector
http://git-wip-us.apache.org/repos/asf/orc/blob/ca7c97eb/java/core/src/test/org/apache/orc/impl/TestOutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index e9614d5..77aae06 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -19,6 +19,7 @@
package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.PhysicalWriter;
import org.junit.Test;
import org.mockito.Mockito;
@@ -30,8 +31,8 @@ public class TestOutStream {
@Test
public void testFlush() throws Exception {
- OutStream.OutputReceiver receiver =
- Mockito.mock(OutStream.OutputReceiver.class);
+ PhysicalWriter.OutputReceiver receiver =
+ Mockito.mock(PhysicalWriter.OutputReceiver.class);
CompressionCodec codec = new ZlibCodec();
OutStream stream = new OutStream("test", 128*1024, codec, receiver);
assertEquals(0L, stream.getBufferSize());