You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Leo Alekseyev <dn...@gmail.com> on 2010/11/18 03:00:39 UTC

Hive produces very small files despite hive.merge...=true settings

I have jobs that sample (or generate) a small amount of data from a
large table.  At the end, I get e.g. about 3000 or more files of 1kb
or so.  This becomes a nuisance.  How can I make Hive do another pass
to merge the output?  I have the following settings:

hive.merge.mapfiles=true
hive.merge.mapredfiles=true
hive.merge.size.per.task=256000000
hive.merge.size.smallfiles.avgsize=16000000

After setting hive.merge* to true, Hive started indicating "Total
MapReduce jobs = 2".  However, after generating the
lots-of-small-files table, Hive says:
Ended Job = job_201011021934_1344
Ended Job = 781771542, job is filtered out (removed at runtime).

Is there a way to force the merge, or am I missing something?
--Leo

Re: Hive produces very small files despite hive.merge...=true settings

Posted by yongqiang he <he...@gmail.com>.
I can not think this could be the cause.

The problem should be: your files can not be merged. I mean the file
size is bigger than the split size

On Friday, November 19, 2010, Leo Alekseyev <dn...@gmail.com> wrote:
> Folks, thanks for your help.  I've narrowed the problem down to
> compression.  When I set hive.exec.compress.output=false, merges
> proceed as expected.  When compression is on, the merge job doesn't
> seem to actually merge, it just spits out the input.
>
> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <he...@gmail.com> wrote:
>> These are the parameters that control the behavior. (Try to set them
>> to different values if it does not work in your environment.)
>>
>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>> set mapred.min.split.size.per.node=1000000000;
>> set mapred.min.split.size.per.rack=1000000000;
>> set mapred.max.split.size=1000000000;
>>
>> set hive.merge.size.per.task=1000000000;
>> set hive.merge.smallfiles.avgsize=1000000000;
>> set hive.merge.size.smallfiles.avgsize=1000000000;
>> set hive.exec.dynamic.partition.mode=nonstrict;
>>
>>
>> The output size of the second job is also controlled by the split
>> size, as shown in the first 4 lines.
>>
>>
>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>> worked for me in the past.  Again, what's strange here is with the
>>> latest Hive build the merge stage appears to run, but it doesn't
>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>> doesn't do anything.
>>>
>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>>>> What version of Hadoop are you on?
>>>>
>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>>
>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>> same number of files (# of files generated is equal to the number of
>>>>> the original mappers, so I have no idea what the second stage is
>>>>> actually doing).
>>>>>
>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>> of small files.
>>>>>
>>>>> The query is kind of large, but in essence it's simply
>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>> [conditions].
>>>>>
>>>>>
>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>> (ds) select
>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>> OK
>>>>> ABSTRACT SYNTAX TREE:
>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Ning Zhang <nz...@fb.com>.
This should be expected. Compressed text files are not splittable so that CombineHiveInputFormat cannot read multiple files per mapper. CombinedHiveInputFormat is used when hive.merge.maponly=true. If you set it to false, we'll use HiveInputFormat and that should be able to merge compressed text files. 


On Nov 22, 2010, at 10:05 PM, Leo Alekseyev wrote:

> I found another criterion that determines whether or not the merge job
> runs with compression turned on.  It seems that if the target table is
> stored as an rcfile, merges work, but if a text file, merges will
> fail.  For instance:
> 
> -- merge will work here:
> create table  alogs_dbg_sample3 (server_host STRING, client_ip INT,
> time_stamp INT) row format serde
> 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as
> rcfile;
> insert overwrite table alogs_dbg_sample3 select
> server_host,client_ip,time_stamp from alogs_dbg_subset
> TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;
> 
> -- merge will fail here:
> create table alogs_dbg_sample2 (server_host STRING, client_ip INT,
> time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES
> TERMINATED BY '\n' STORED AS TEXTFILE;
> insert overwrite table alogs_dbg_sample2 select
> server_host,client_ip,time_stamp from alogs_dbg_subset
> TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;
> 
> Is this a bug?..  I don't see why merge job running should be
> sensitive to the output table format.
> P.S:  I have hive.merge.maponly=true and am using LZO compression
> 
> On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nz...@fb.com> wrote:
>> It makes sense. CombineHiveInputFormat does not work with compressed text files (suffix *.gz) since it is not splittable. I think your default hive.file.format=CombineHiveInputFormat. But I think by setting hive.merge.maponly it should work (meaning merge should be succeeded). By setting hive.merge.maponly, you'll have multiple mappers (the same # of small files) and 1 reducer. The reducer's output should be the merged result.
>> 
>> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:
>> 
>>> Folks, thanks for your help.  I've narrowed the problem down to
>>> compression.  When I set hive.exec.compress.output=false, merges
>>> proceed as expected.  When compression is on, the merge job doesn't
>>> seem to actually merge, it just spits out the input.
>>> 
>>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <he...@gmail.com> wrote:
>>>> These are the parameters that control the behavior. (Try to set them
>>>> to different values if it does not work in your environment.)
>>>> 
>>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>>>> set mapred.min.split.size.per.node=1000000000;
>>>> set mapred.min.split.size.per.rack=1000000000;
>>>> set mapred.max.split.size=1000000000;
>>>> 
>>>> set hive.merge.size.per.task=1000000000;
>>>> set hive.merge.smallfiles.avgsize=1000000000;
>>>> set hive.merge.size.smallfiles.avgsize=1000000000;
>>>> set hive.exec.dynamic.partition.mode=nonstrict;
>>>> 
>>>> 
>>>> The output size of the second job is also controlled by the split
>>>> size, as shown in the first 4 lines.
>>>> 
>>>> 
>>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>>>> worked for me in the past.  Again, what's strange here is with the
>>>>> latest Hive build the merge stage appears to run, but it doesn't
>>>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>>>> doesn't do anything.
>>>>> 
>>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>>>>>> What version of Hadoop are you on?
>>>>>> 
>>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>>>> 
>>>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>>>> same number of files (# of files generated is equal to the number of
>>>>>>> the original mappers, so I have no idea what the second stage is
>>>>>>> actually doing).
>>>>>>> 
>>>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>>>> of small files.
>>>>>>> 
>>>>>>> The query is kind of large, but in essence it's simply
>>>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>>>> [conditions].
>>>>>>> 
>>>>>>> 
>>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>>>> (ds) select
>>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>>>> OK
>>>>>>> ABSTRACT SYNTAX TREE:
>>>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>>>> './GeoIP.dat') 'US')))))
>>>>>>> 
>>>>>>> STAGE DEPENDENCIES:
>>>>>>>  Stage-1 is a root stage
>>>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>>>  Stage-4
>>>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>>>  Stage-2 depends on stages: Stage-0
>>>>>>>  Stage-3
>>>>>>> 
>>>>>>> STAGE PLANS:
>>>>>>>  Stage: Stage-1
>>>>>>>    Map Reduce
>>>>>>>      Alias -> Map Operator Tree:
>>>>>>>        am_s
>>>>>>>          TableScan
>>>>>>>            alias: am_s
>>>>>>>            Filter Operator
>>>>>>>              predicate:
>>>>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>>>>                  type: boolean
>>>>>>>              Filter Operator
>>>>>>>                predicate:
>>>>>>>                    expr: ((request_url rlike
>>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>>                    type: boolean
>>>>>>>                Filter Operator
>>>>>>>                  predicate:
>>>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>>                      type: boolean
>>>>>>>                  Select Operator
>>>>>>>                    expressions:
>>>>>>>                          expr: server_host
>>>>>>>                          type: string
>>>>>>>                          expr: client_ip
>>>>>>>                          type: int
>>>>>>>                          expr: time_stamp
>>>>>>>                          type: int
>>>>>>>                          expr: concat(server_host, ':',
>>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>>>                          type: string
>>>>>>>                          expr: referrer
>>>>>>>                          type: string
>>>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>>>                          type: string
>>>>>>>                          expr: user_agent
>>>>>>>                          type: string
>>>>>>>                          expr: cookie
>>>>>>>                          type: string
>>>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>>>                          type: string
>>>>>>>                          expr: ''
>>>>>>>                          type: string
>>>>>>>                          expr: ds
>>>>>>>                          type: string
>>>>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>>>                    File Output Operator
>>>>>>>                      compressed: true
>>>>>>>                      GlobalTableId: 1
>>>>>>>                      table:
>>>>>>>                          input format:
>>>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>>>                          output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>                          serde:
>>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>                          name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>>  Stage: Stage-5
>>>>>>>    Conditional Operator
>>>>>>> 
>>>>>>>  Stage: Stage-4
>>>>>>>    Move Operator
>>>>>>>      files:
>>>>>>>          hdfs directory: true
>>>>>>>          destination:
>>>>>>> 
>>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>>>> 
>>>>>>>  Stage: Stage-0
>>>>>>>    Move Operator
>>>>>>>      tables:
>>>>>>>          partition:
>>>>>>>            ds
>>>>>>>          replace: true
>>>>>>>          table:
>>>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>>              output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>              name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>>  Stage: Stage-2
>>>>>>>    Stats-Aggr Operator
>>>>>>> 
>>>>>>>  Stage: Stage-3
>>>>>>>    Map Reduce
>>>>>>>      Alias -> Map Operator Tree:
>>>>>>> 
>>>>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>>>            File Output Operator
>>>>>>>              compressed: true
>>>>>>>              GlobalTableId: 0
>>>>>>>              table:
>>>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>>                  output format:
>>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>>                  name: hbase_prefilter3_us_sample
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>>>>>> to be there for merging to take place. HIVE-1307 was committed to trunk on
>>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe you can
>>>>>>>> post your query and the result of 'explain <query>' and we can take a look.
>>>>>>>> 
>>>>>>>> Ning
>>>>>>>> 
>>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>>>> 
>>>>>>>>> Hi Ning,
>>>>>>>>> For the dataset I'm experimenting with, the total size of the output
>>>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I
>>>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the
>>>>>>>>> job use fewer mappers.  The merge job was *still* filtered out at
>>>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>>>>>>> have any effect.
>>>>>>>>> 
>>>>>>>>> I am a bit at a loss what to do here.  Is there a way to see what's
>>>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>>>>>>> dynamic partitions; could that somehow be interfering with the merge
>>>>>>>>> job?..
>>>>>>>>> 
>>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>>>>>>> ago).
>>>>>>>>> 
>>>>>>>>> --Leo
>>>>>>>>> 
>>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>>>>>> The settings looks good. The parameter
>>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>>>>>>>>>> merge should be triggered: if the average size of the files in the partition
>>>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the merge
>>>>>>>>>> should be scheduled. Can you try to see if you have any big files as well in
>>>>>>>>>> your resulting partition? If it is because of a very large file, you can set
>>>>>>>>>> the parameter large enough.
>>>>>>>>>> 
>>>>>>>>>> Another possibility is that your Hadoop installation does not support
>>>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>>>>>>>> reported previously merge was not successful because of this. If that's the
>>>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old
>>>>>>>>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>>>>>>>> 
>>>>>>>>>> Ning
>>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>>>> 
>>>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a
>>>>>>>>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>>>>>>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>>>> 
>>>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>>>> 
>>>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>>>>>>>>> MapReduce jobs = 2".  However, after generating the
>>>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>>>>>>>> 
>>>>>>>>>>> Is there a way to force the merge, or am I missing something?
>>>>>>>>>>> --Leo
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Dave Brondsema
>>>>>> Software Engineer
>>>>>> Geeknet
>>>>>> 
>>>>>> www.geek.net
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: Hive produces very small files despite hive.merge...=true settings

Posted by Leo Alekseyev <dn...@gmail.com>.
I found another criterion that determines whether or not the merge job
runs with compression turned on.  It seems that if the target table is
stored as an rcfile, merges work, but if a text file, merges will
fail.  For instance:

-- merge will work here:
create table  alogs_dbg_sample3 (server_host STRING, client_ip INT,
time_stamp INT) row format serde
'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' stored as
rcfile;
insert overwrite table alogs_dbg_sample3 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

-- merge will fail here:
create table alogs_dbg_sample2 (server_host STRING, client_ip INT,
time_stamp INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES
TERMINATED BY '\n' STORED AS TEXTFILE;
insert overwrite table alogs_dbg_sample2 select
server_host,client_ip,time_stamp from alogs_dbg_subset
TABLESAMPLE(BUCKET 1 OUT OF 1000 ON rand()) s;

Is this a bug?..  I don't see why merge job running should be
sensitive to the output table format.
P.S:  I have hive.merge.maponly=true and am using LZO compression

On Fri, Nov 19, 2010 at 5:20 PM, Ning Zhang <nz...@fb.com> wrote:
> It makes sense. CombineHiveInputFormat does not work with compressed text files (suffix *.gz) since it is not splittable. I think your default hive.file.format=CombineHiveInputFormat. But I think by setting hive.merge.maponly it should work (meaning merge should be succeeded). By setting hive.merge.maponly, you'll have multiple mappers (the same # of small files) and 1 reducer. The reducer's output should be the merged result.
>
> On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:
>
>> Folks, thanks for your help.  I've narrowed the problem down to
>> compression.  When I set hive.exec.compress.output=false, merges
>> proceed as expected.  When compression is on, the merge job doesn't
>> seem to actually merge, it just spits out the input.
>>
>> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <he...@gmail.com> wrote:
>>> These are the parameters that control the behavior. (Try to set them
>>> to different values if it does not work in your environment.)
>>>
>>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>>> set mapred.min.split.size.per.node=1000000000;
>>> set mapred.min.split.size.per.rack=1000000000;
>>> set mapred.max.split.size=1000000000;
>>>
>>> set hive.merge.size.per.task=1000000000;
>>> set hive.merge.smallfiles.avgsize=1000000000;
>>> set hive.merge.size.smallfiles.avgsize=1000000000;
>>> set hive.exec.dynamic.partition.mode=nonstrict;
>>>
>>>
>>> The output size of the second job is also controlled by the split
>>> size, as shown in the first 4 lines.
>>>
>>>
>>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>>> worked for me in the past.  Again, what's strange here is with the
>>>> latest Hive build the merge stage appears to run, but it doesn't
>>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>>> doesn't do anything.
>>>>
>>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>>>>> What version of Hadoop are you on?
>>>>>
>>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>>>
>>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>>> same number of files (# of files generated is equal to the number of
>>>>>> the original mappers, so I have no idea what the second stage is
>>>>>> actually doing).
>>>>>>
>>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>>> of small files.
>>>>>>
>>>>>> The query is kind of large, but in essence it's simply
>>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>>> [conditions].
>>>>>>
>>>>>>
>>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>>> (ds) select
>>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>>> OK
>>>>>> ABSTRACT SYNTAX TREE:
>>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>>> './GeoIP.dat') 'US')))))
>>>>>>
>>>>>> STAGE DEPENDENCIES:
>>>>>>  Stage-1 is a root stage
>>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>>  Stage-4
>>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>>  Stage-2 depends on stages: Stage-0
>>>>>>  Stage-3
>>>>>>
>>>>>> STAGE PLANS:
>>>>>>  Stage: Stage-1
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>        am_s
>>>>>>          TableScan
>>>>>>            alias: am_s
>>>>>>            Filter Operator
>>>>>>              predicate:
>>>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>>>                  type: boolean
>>>>>>              Filter Operator
>>>>>>                predicate:
>>>>>>                    expr: ((request_url rlike
>>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                    type: boolean
>>>>>>                Filter Operator
>>>>>>                  predicate:
>>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>>                      type: boolean
>>>>>>                  Select Operator
>>>>>>                    expressions:
>>>>>>                          expr: server_host
>>>>>>                          type: string
>>>>>>                          expr: client_ip
>>>>>>                          type: int
>>>>>>                          expr: time_stamp
>>>>>>                          type: int
>>>>>>                          expr: concat(server_host, ':',
>>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>>                          type: string
>>>>>>                          expr: referrer
>>>>>>                          type: string
>>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>>                          type: string
>>>>>>                          expr: user_agent
>>>>>>                          type: string
>>>>>>                          expr: cookie
>>>>>>                          type: string
>>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>>                          type: string
>>>>>>                          expr: ''
>>>>>>                          type: string
>>>>>>                          expr: ds
>>>>>>                          type: string
>>>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>>                    File Output Operator
>>>>>>                      compressed: true
>>>>>>                      GlobalTableId: 1
>>>>>>                      table:
>>>>>>                          input format:
>>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>>                          output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                          serde:
>>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                          name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-5
>>>>>>    Conditional Operator
>>>>>>
>>>>>>  Stage: Stage-4
>>>>>>    Move Operator
>>>>>>      files:
>>>>>>          hdfs directory: true
>>>>>>          destination:
>>>>>>
>>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>>>
>>>>>>  Stage: Stage-0
>>>>>>    Move Operator
>>>>>>      tables:
>>>>>>          partition:
>>>>>>            ds
>>>>>>          replace: true
>>>>>>          table:
>>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>              output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>              name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>  Stage: Stage-2
>>>>>>    Stats-Aggr Operator
>>>>>>
>>>>>>  Stage: Stage-3
>>>>>>    Map Reduce
>>>>>>      Alias -> Map Operator Tree:
>>>>>>
>>>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>>            File Output Operator
>>>>>>              compressed: true
>>>>>>              GlobalTableId: 0
>>>>>>              table:
>>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>>                  output format:
>>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>>                  name: hbase_prefilter3_us_sample
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>>>>> to be there for merging to take place. HIVE-1307 was committed to trunk on
>>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe you can
>>>>>>> post your query and the result of 'explain <query>' and we can take a look.
>>>>>>>
>>>>>>> Ning
>>>>>>>
>>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>>>
>>>>>>>> Hi Ning,
>>>>>>>> For the dataset I'm experimenting with, the total size of the output
>>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I
>>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the
>>>>>>>> job use fewer mappers.  The merge job was *still* filtered out at
>>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>>>>>> have any effect.
>>>>>>>>
>>>>>>>> I am a bit at a loss what to do here.  Is there a way to see what's
>>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>>>>>> dynamic partitions; could that somehow be interfering with the merge
>>>>>>>> job?..
>>>>>>>>
>>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>>>>>> ago).
>>>>>>>>
>>>>>>>> --Leo
>>>>>>>>
>>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>>>>> The settings looks good. The parameter
>>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>>>>>>>>> merge should be triggered: if the average size of the files in the partition
>>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the merge
>>>>>>>>> should be scheduled. Can you try to see if you have any big files as well in
>>>>>>>>> your resulting partition? If it is because of a very large file, you can set
>>>>>>>>> the parameter large enough.
>>>>>>>>>
>>>>>>>>> Another possibility is that your Hadoop installation does not support
>>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>>>>>>> reported previously merge was not successful because of this. If that's the
>>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old
>>>>>>>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>>>>>>>
>>>>>>>>> Ning
>>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>>>
>>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a
>>>>>>>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>>>>>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>>>
>>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>>>
>>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>>>>>>>> MapReduce jobs = 2".  However, after generating the
>>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>>>>>>>
>>>>>>>>>> Is there a way to force the merge, or am I missing something?
>>>>>>>>>> --Leo
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Dave Brondsema
>>>>> Software Engineer
>>>>> Geeknet
>>>>>
>>>>> www.geek.net
>>>>>
>>>>
>>>
>
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Ning Zhang <nz...@fb.com>.
It makes sense. CombineHiveInputFormat does not work with compressed text files (suffix *.gz) since it is not splittable. I think your default hive.file.format=CombineHiveInputFormat. But I think by setting hive.merge.maponly it should work (meaning merge should be succeeded). By setting hive.merge.maponly, you'll have multiple mappers (the same # of small files) and 1 reducer. The reducer's output should be the merged result. 

On Nov 19, 2010, at 1:22 PM, Leo Alekseyev wrote:

> Folks, thanks for your help.  I've narrowed the problem down to
> compression.  When I set hive.exec.compress.output=false, merges
> proceed as expected.  When compression is on, the merge job doesn't
> seem to actually merge, it just spits out the input.
> 
> On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <he...@gmail.com> wrote:
>> These are the parameters that control the behavior. (Try to set them
>> to different values if it does not work in your environment.)
>> 
>> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
>> set mapred.min.split.size.per.node=1000000000;
>> set mapred.min.split.size.per.rack=1000000000;
>> set mapred.max.split.size=1000000000;
>> 
>> set hive.merge.size.per.task=1000000000;
>> set hive.merge.smallfiles.avgsize=1000000000;
>> set hive.merge.size.smallfiles.avgsize=1000000000;
>> set hive.exec.dynamic.partition.mode=nonstrict;
>> 
>> 
>> The output size of the second job is also controlled by the split
>> size, as shown in the first 4 lines.
>> 
>> 
>> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
>>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>>> worked for me in the past.  Again, what's strange here is with the
>>> latest Hive build the merge stage appears to run, but it doesn't
>>> actually merge -- it's a quick map-only job that, near as I can tell,
>>> doesn't do anything.
>>> 
>>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>>>> What version of Hadoop are you on?
>>>> 
>>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>> 
>>>>> I thought I was running Hive with those changes merged in, but to make
>>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>>> same number of files (# of files generated is equal to the number of
>>>>> the original mappers, so I have no idea what the second stage is
>>>>> actually doing).
>>>>> 
>>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>>> of small files.
>>>>> 
>>>>> The query is kind of large, but in essence it's simply
>>>>> insert overwrite table foo partition(bar) select [columns] from
>>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>>> [conditions].
>>>>> 
>>>>> 
>>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>>> (ds) select
>>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>>> OK
>>>>> ABSTRACT SYNTAX TREE:
>>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>>> './GeoIP.dat') 'US')))))
>>>>> 
>>>>> STAGE DEPENDENCIES:
>>>>>  Stage-1 is a root stage
>>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>>  Stage-4
>>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>>  Stage-2 depends on stages: Stage-0
>>>>>  Stage-3
>>>>> 
>>>>> STAGE PLANS:
>>>>>  Stage: Stage-1
>>>>>    Map Reduce
>>>>>      Alias -> Map Operator Tree:
>>>>>        am_s
>>>>>          TableScan
>>>>>            alias: am_s
>>>>>            Filter Operator
>>>>>              predicate:
>>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>>                  type: boolean
>>>>>              Filter Operator
>>>>>                predicate:
>>>>>                    expr: ((request_url rlike
>>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>                    type: boolean
>>>>>                Filter Operator
>>>>>                  predicate:
>>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>>                      type: boolean
>>>>>                  Select Operator
>>>>>                    expressions:
>>>>>                          expr: server_host
>>>>>                          type: string
>>>>>                          expr: client_ip
>>>>>                          type: int
>>>>>                          expr: time_stamp
>>>>>                          type: int
>>>>>                          expr: concat(server_host, ':',
>>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>>                          type: string
>>>>>                          expr: referrer
>>>>>                          type: string
>>>>>                          expr: parse_url(referrer, 'HOST')
>>>>>                          type: string
>>>>>                          expr: user_agent
>>>>>                          type: string
>>>>>                          expr: cookie
>>>>>                          type: string
>>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>>                          type: string
>>>>>                          expr: ''
>>>>>                          type: string
>>>>>                          expr: ds
>>>>>                          type: string
>>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>>                    File Output Operator
>>>>>                      compressed: true
>>>>>                      GlobalTableId: 1
>>>>>                      table:
>>>>>                          input format:
>>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>>                          output format:
>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>                          serde:
>>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>                          name: hbase_prefilter3_us_sample
>>>>> 
>>>>>  Stage: Stage-5
>>>>>    Conditional Operator
>>>>> 
>>>>>  Stage: Stage-4
>>>>>    Move Operator
>>>>>      files:
>>>>>          hdfs directory: true
>>>>>          destination:
>>>>> 
>>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>> 
>>>>>  Stage: Stage-0
>>>>>    Move Operator
>>>>>      tables:
>>>>>          partition:
>>>>>            ds
>>>>>          replace: true
>>>>>          table:
>>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>              output format:
>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>              name: hbase_prefilter3_us_sample
>>>>> 
>>>>>  Stage: Stage-2
>>>>>    Stats-Aggr Operator
>>>>> 
>>>>>  Stage: Stage-3
>>>>>    Map Reduce
>>>>>      Alias -> Map Operator Tree:
>>>>> 
>>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>>            File Output Operator
>>>>>              compressed: true
>>>>>              GlobalTableId: 0
>>>>>              table:
>>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>>                  output format:
>>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>>                  name: hbase_prefilter3_us_sample
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>>>> to be there for merging to take place. HIVE-1307 was committed to trunk on
>>>>>> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>>>>> your Hive trunk and rerun the query. If it still doesn't work maybe you can
>>>>>> post your query and the result of 'explain <query>' and we can take a look.
>>>>>> 
>>>>>> Ning
>>>>>> 
>>>>>> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>>>> 
>>>>>>> Hi Ning,
>>>>>>> For the dataset I'm experimenting with, the total size of the output
>>>>>>> is 2mb, and the files are at most a few kb in size.  My
>>>>>>> hive.input.format was set to default HiveInputFormat; however, when I
>>>>>>> set it to CombineHiveInputFormat, it only made the first stage of the
>>>>>>> job use fewer mappers.  The merge job was *still* filtered out at
>>>>>>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>>>>> have any effect.
>>>>>>> 
>>>>>>> I am a bit at a loss what to do here.  Is there a way to see what's
>>>>>>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>>>>> dynamic partitions; could that somehow be interfering with the merge
>>>>>>> job?..
>>>>>>> 
>>>>>>> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>>>>> ago).
>>>>>>> 
>>>>>>> --Leo
>>>>>>> 
>>>>>>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>>>>>>> The settings looks good. The parameter
>>>>>>>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>>>>>>>> merge should be triggered: if the average size of the files in the partition
>>>>>>>> is SMALLER than the parameter and there are more than 1 file, the merge
>>>>>>>> should be scheduled. Can you try to see if you have any big files as well in
>>>>>>>> your resulting partition? If it is because of a very large file, you can set
>>>>>>>> the parameter large enough.
>>>>>>>> 
>>>>>>>> Another possibility is that your Hadoop installation does not support
>>>>>>>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>>>>>> reported previously merge was not successful because of this. If that's the
>>>>>>>> case, you can turn off CombineHiveInputFormat and use the old
>>>>>>>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>>>>>> 
>>>>>>>> Ning
>>>>>>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>>>>>> 
>>>>>>>>> I have jobs that sample (or generate) a small amount of data from a
>>>>>>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>>>>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>>>>>>> to merge the output?  I have the following settings:
>>>>>>>>> 
>>>>>>>>> hive.merge.mapfiles=true
>>>>>>>>> hive.merge.mapredfiles=true
>>>>>>>>> hive.merge.size.per.task=256000000
>>>>>>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>>>>>> 
>>>>>>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>>>>>>> MapReduce jobs = 2".  However, after generating the
>>>>>>>>> lots-of-small-files table, Hive says:
>>>>>>>>> Ended Job = job_201011021934_1344
>>>>>>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>>>>>> 
>>>>>>>>> Is there a way to force the merge, or am I missing something?
>>>>>>>>> --Leo
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Dave Brondsema
>>>> Software Engineer
>>>> Geeknet
>>>> 
>>>> www.geek.net
>>>> 
>>> 
>> 


Re: Hive produces very small files despite hive.merge...=true settings

Posted by Leo Alekseyev <dn...@gmail.com>.
Folks, thanks for your help.  I've narrowed the problem down to
compression.  When I set hive.exec.compress.output=false, merges
proceed as expected.  When compression is on, the merge job doesn't
seem to actually merge, it just spits out the input.

On Fri, Nov 19, 2010 at 10:51 AM, yongqiang he <he...@gmail.com> wrote:
> These are the parameters that control the behavior. (Try to set them
> to different values if it does not work in your environment.)
>
> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
> set mapred.min.split.size.per.node=1000000000;
> set mapred.min.split.size.per.rack=1000000000;
> set mapred.max.split.size=1000000000;
>
> set hive.merge.size.per.task=1000000000;
> set hive.merge.smallfiles.avgsize=1000000000;
> set hive.merge.size.smallfiles.avgsize=1000000000;
> set hive.exec.dynamic.partition.mode=nonstrict;
>
>
> The output size of the second job is also controlled by the split
> size, as shown in the first 4 lines.
>
>
> On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
>> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
>> worked for me in the past.  Again, what's strange here is with the
>> latest Hive build the merge stage appears to run, but it doesn't
>> actually merge -- it's a quick map-only job that, near as I can tell,
>> doesn't do anything.
>>
>> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>>> What version of Hadoop are you on?
>>>
>>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>>
>>>> I thought I was running Hive with those changes merged in, but to make
>>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>>> same number of files (# of files generated is equal to the number of
>>>> the original mappers, so I have no idea what the second stage is
>>>> actually doing).
>>>>
>>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>>> of small files.
>>>>
>>>> The query is kind of large, but in essence it's simply
>>>> insert overwrite table foo partition(bar) select [columns] from
>>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>>> [conditions].
>>>>
>>>>
>>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>>> (ds) select
>>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>>> OK
>>>> ABSTRACT SYNTAX TREE:
>>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>>> './GeoIP.dat') 'US')))))
>>>>
>>>> STAGE DEPENDENCIES:
>>>>  Stage-1 is a root stage
>>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>>  Stage-4
>>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>>  Stage-2 depends on stages: Stage-0
>>>>  Stage-3
>>>>
>>>> STAGE PLANS:
>>>>  Stage: Stage-1
>>>>    Map Reduce
>>>>      Alias -> Map Operator Tree:
>>>>        am_s
>>>>          TableScan
>>>>            alias: am_s
>>>>            Filter Operator
>>>>              predicate:
>>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>>                  type: boolean
>>>>              Filter Operator
>>>>                predicate:
>>>>                    expr: ((request_url rlike
>>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>                    type: boolean
>>>>                Filter Operator
>>>>                  predicate:
>>>>                      expr: (((ds = '2010-11-05') and (request_url
>>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>>                      type: boolean
>>>>                  Select Operator
>>>>                    expressions:
>>>>                          expr: server_host
>>>>                          type: string
>>>>                          expr: client_ip
>>>>                          type: int
>>>>                          expr: time_stamp
>>>>                          type: int
>>>>                          expr: concat(server_host, ':',
>>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>>                          type: string
>>>>                          expr: referrer
>>>>                          type: string
>>>>                          expr: parse_url(referrer, 'HOST')
>>>>                          type: string
>>>>                          expr: user_agent
>>>>                          type: string
>>>>                          expr: cookie
>>>>                          type: string
>>>>                          expr: GenericUDFGeoIP ( client_ip,
>>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>>                          type: string
>>>>                          expr: ''
>>>>                          type: string
>>>>                          expr: ds
>>>>                          type: string
>>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>>                    File Output Operator
>>>>                      compressed: true
>>>>                      GlobalTableId: 1
>>>>                      table:
>>>>                          input format:
>>>> org.apache.hadoop.mapred.TextInputFormat
>>>>                          output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>                          serde:
>>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>                          name: hbase_prefilter3_us_sample
>>>>
>>>>  Stage: Stage-5
>>>>    Conditional Operator
>>>>
>>>>  Stage: Stage-4
>>>>    Move Operator
>>>>      files:
>>>>          hdfs directory: true
>>>>          destination:
>>>>
>>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>>
>>>>  Stage: Stage-0
>>>>    Move Operator
>>>>      tables:
>>>>          partition:
>>>>            ds
>>>>          replace: true
>>>>          table:
>>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>>              output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>              name: hbase_prefilter3_us_sample
>>>>
>>>>  Stage: Stage-2
>>>>    Stats-Aggr Operator
>>>>
>>>>  Stage: Stage-3
>>>>    Map Reduce
>>>>      Alias -> Map Operator Tree:
>>>>
>>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>>            File Output Operator
>>>>              compressed: true
>>>>              GlobalTableId: 0
>>>>              table:
>>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>>                  output format:
>>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>>                  name: hbase_prefilter3_us_sample
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>>>> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>>> > to be there for merging to take place. HIVE-1307 was committed to trunk on
>>>> > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>>> > your Hive trunk and rerun the query. If it still doesn't work maybe you can
>>>> > post your query and the result of 'explain <query>' and we can take a look.
>>>> >
>>>> > Ning
>>>> >
>>>> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>>> >
>>>> >> Hi Ning,
>>>> >> For the dataset I'm experimenting with, the total size of the output
>>>> >> is 2mb, and the files are at most a few kb in size.  My
>>>> >> hive.input.format was set to default HiveInputFormat; however, when I
>>>> >> set it to CombineHiveInputFormat, it only made the first stage of the
>>>> >> job use fewer mappers.  The merge job was *still* filtered out at
>>>> >> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>>> >> have any effect.
>>>> >>
>>>> >> I am a bit at a loss what to do here.  Is there a way to see what's
>>>> >> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>>> >> dynamic partitions; could that somehow be interfering with the merge
>>>> >> job?..
>>>> >>
>>>> >> I'm running a relatively fresh Hive from trunk (built maybe a month
>>>> >> ago).
>>>> >>
>>>> >> --Leo
>>>> >>
>>>> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>>> >>> The settings looks good. The parameter
>>>> >>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>>>> >>> merge should be triggered: if the average size of the files in the partition
>>>> >>> is SMALLER than the parameter and there are more than 1 file, the merge
>>>> >>> should be scheduled. Can you try to see if you have any big files as well in
>>>> >>> your resulting partition? If it is because of a very large file, you can set
>>>> >>> the parameter large enough.
>>>> >>>
>>>> >>> Another possibility is that your Hadoop installation does not support
>>>> >>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>>> >>> reported previously merge was not successful because of this. If that's the
>>>> >>> case, you can turn off CombineHiveInputFormat and use the old
>>>> >>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>> >>>
>>>> >>> Ning
>>>> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>> >>>
>>>> >>>> I have jobs that sample (or generate) a small amount of data from a
>>>> >>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>> >>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>> >>>> to merge the output?  I have the following settings:
>>>> >>>>
>>>> >>>> hive.merge.mapfiles=true
>>>> >>>> hive.merge.mapredfiles=true
>>>> >>>> hive.merge.size.per.task=256000000
>>>> >>>> hive.merge.size.smallfiles.avgsize=16000000
>>>> >>>>
>>>> >>>> After setting hive.merge* to true, Hive started indicating "Total
>>>> >>>> MapReduce jobs = 2".  However, after generating the
>>>> >>>> lots-of-small-files table, Hive says:
>>>> >>>> Ended Job = job_201011021934_1344
>>>> >>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>> >>>>
>>>> >>>> Is there a way to force the merge, or am I missing something?
>>>> >>>> --Leo
>>>> >>>
>>>> >>>
>>>> >
>>>> >
>>>
>>>
>>>
>>> --
>>> Dave Brondsema
>>> Software Engineer
>>> Geeknet
>>>
>>> www.geek.net
>>>
>>
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by yongqiang he <he...@gmail.com>.
These are the parameters that control the behavior. (Try to set them
to different values if it does not work in your environment.)

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set mapred.min.split.size.per.node=1000000000;
set mapred.min.split.size.per.rack=1000000000;
set mapred.max.split.size=1000000000;

set hive.merge.size.per.task=1000000000;
set hive.merge.smallfiles.avgsize=1000000000;
set hive.merge.size.smallfiles.avgsize=1000000000;
set hive.exec.dynamic.partition.mode=nonstrict;


The output size of the second job is also controlled by the split
size, as shown in the first 4 lines.


On Fri, Nov 19, 2010 at 10:22 AM, Leo Alekseyev <dn...@gmail.com> wrote:
> I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
> worked for me in the past.  Again, what's strange here is with the
> latest Hive build the merge stage appears to run, but it doesn't
> actually merge -- it's a quick map-only job that, near as I can tell,
> doesn't do anything.
>
> On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
>> What version of Hadoop are you on?
>>
>> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>>
>>> I thought I was running Hive with those changes merged in, but to make
>>> sure, I built the latest trunk version.  The behavior changed somewhat
>>> (as in, it runs 2 stages instead of 1), but it still generates the
>>> same number of files (# of files generated is equal to the number of
>>> the original mappers, so I have no idea what the second stage is
>>> actually doing).
>>>
>>> See below for query / explain query.  Stage 1 runs always; Stage 3
>>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>>> of small files.
>>>
>>> The query is kind of large, but in essence it's simply
>>> insert overwrite table foo partition(bar) select [columns] from
>>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>>> [conditions].
>>>
>>>
>>> explain insert overwrite table hbase_prefilter3_us_sample partition
>>> (ds) select
>>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>>> am_s.ds='2010-11-05' and am_s.request_url rlike
>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>>> OK
>>> ABSTRACT SYNTAX TREE:
>>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>>> './GeoIP.dat') 'US')))))
>>>
>>> STAGE DEPENDENCIES:
>>>  Stage-1 is a root stage
>>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>>  Stage-4
>>>  Stage-0 depends on stages: Stage-4, Stage-3
>>>  Stage-2 depends on stages: Stage-0
>>>  Stage-3
>>>
>>> STAGE PLANS:
>>>  Stage: Stage-1
>>>    Map Reduce
>>>      Alias -> Map Operator Tree:
>>>        am_s
>>>          TableScan
>>>            alias: am_s
>>>            Filter Operator
>>>              predicate:
>>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>>                  type: boolean
>>>              Filter Operator
>>>                predicate:
>>>                    expr: ((request_url rlike
>>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>                    type: boolean
>>>                Filter Operator
>>>                  predicate:
>>>                      expr: (((ds = '2010-11-05') and (request_url
>>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>>                      type: boolean
>>>                  Select Operator
>>>                    expressions:
>>>                          expr: server_host
>>>                          type: string
>>>                          expr: client_ip
>>>                          type: int
>>>                          expr: time_stamp
>>>                          type: int
>>>                          expr: concat(server_host, ':',
>>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>>                          type: string
>>>                          expr: referrer
>>>                          type: string
>>>                          expr: parse_url(referrer, 'HOST')
>>>                          type: string
>>>                          expr: user_agent
>>>                          type: string
>>>                          expr: cookie
>>>                          type: string
>>>                          expr: GenericUDFGeoIP ( client_ip,
>>> 'COUNTRY_CODE', './GeoIP.dat' )
>>>                          type: string
>>>                          expr: ''
>>>                          type: string
>>>                          expr: ds
>>>                          type: string
>>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>>                    File Output Operator
>>>                      compressed: true
>>>                      GlobalTableId: 1
>>>                      table:
>>>                          input format:
>>> org.apache.hadoop.mapred.TextInputFormat
>>>                          output format:
>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>                          serde:
>>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>                          name: hbase_prefilter3_us_sample
>>>
>>>  Stage: Stage-5
>>>    Conditional Operator
>>>
>>>  Stage: Stage-4
>>>    Move Operator
>>>      files:
>>>          hdfs directory: true
>>>          destination:
>>>
>>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>>
>>>  Stage: Stage-0
>>>    Move Operator
>>>      tables:
>>>          partition:
>>>            ds
>>>          replace: true
>>>          table:
>>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>>              output format:
>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>              name: hbase_prefilter3_us_sample
>>>
>>>  Stage: Stage-2
>>>    Stats-Aggr Operator
>>>
>>>  Stage: Stage-3
>>>    Map Reduce
>>>      Alias -> Map Operator Tree:
>>>
>>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>>            File Output Operator
>>>              compressed: true
>>>              GlobalTableId: 0
>>>              table:
>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>                  output format:
>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>>                  name: hbase_prefilter3_us_sample
>>>
>>>
>>>
>>>
>>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>>> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>>> > to be there for merging to take place. HIVE-1307 was committed to trunk on
>>> > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>>> > your Hive trunk and rerun the query. If it still doesn't work maybe you can
>>> > post your query and the result of 'explain <query>' and we can take a look.
>>> >
>>> > Ning
>>> >
>>> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>>> >
>>> >> Hi Ning,
>>> >> For the dataset I'm experimenting with, the total size of the output
>>> >> is 2mb, and the files are at most a few kb in size.  My
>>> >> hive.input.format was set to default HiveInputFormat; however, when I
>>> >> set it to CombineHiveInputFormat, it only made the first stage of the
>>> >> job use fewer mappers.  The merge job was *still* filtered out at
>>> >> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>>> >> have any effect.
>>> >>
>>> >> I am a bit at a loss what to do here.  Is there a way to see what's
>>> >> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>>> >> dynamic partitions; could that somehow be interfering with the merge
>>> >> job?..
>>> >>
>>> >> I'm running a relatively fresh Hive from trunk (built maybe a month
>>> >> ago).
>>> >>
>>> >> --Leo
>>> >>
>>> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>> >>> The settings looks good. The parameter
>>> >>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>>> >>> merge should be triggered: if the average size of the files in the partition
>>> >>> is SMALLER than the parameter and there are more than 1 file, the merge
>>> >>> should be scheduled. Can you try to see if you have any big files as well in
>>> >>> your resulting partition? If it is because of a very large file, you can set
>>> >>> the parameter large enough.
>>> >>>
>>> >>> Another possibility is that your Hadoop installation does not support
>>> >>> CombineHiveInputFormat, which is used for the new merge job. Someone
>>> >>> reported previously merge was not successful because of this. If that's the
>>> >>> case, you can turn off CombineHiveInputFormat and use the old
>>> >>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>> >>>
>>> >>> Ning
>>> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>> >>>
>>> >>>> I have jobs that sample (or generate) a small amount of data from a
>>> >>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>> >>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>> >>>> to merge the output?  I have the following settings:
>>> >>>>
>>> >>>> hive.merge.mapfiles=true
>>> >>>> hive.merge.mapredfiles=true
>>> >>>> hive.merge.size.per.task=256000000
>>> >>>> hive.merge.size.smallfiles.avgsize=16000000
>>> >>>>
>>> >>>> After setting hive.merge* to true, Hive started indicating "Total
>>> >>>> MapReduce jobs = 2".  However, after generating the
>>> >>>> lots-of-small-files table, Hive says:
>>> >>>> Ended Job = job_201011021934_1344
>>> >>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>> >>>>
>>> >>>> Is there a way to force the merge, or am I missing something?
>>> >>>> --Leo
>>> >>>
>>> >>>
>>> >
>>> >
>>
>>
>>
>> --
>> Dave Brondsema
>> Software Engineer
>> Geeknet
>>
>> www.geek.net
>>
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Leo Alekseyev <dn...@gmail.com>.
I'm using Hadoop 0.20.2.  Merge jobs (with static partitions) have
worked for me in the past.  Again, what's strange here is with the
latest Hive build the merge stage appears to run, but it doesn't
actually merge -- it's a quick map-only job that, near as I can tell,
doesn't do anything.

On Fri, Nov 19, 2010 at 6:14 AM, Dave Brondsema <db...@geek.net> wrote:
> What version of Hadoop are you on?
>
> On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:
>>
>> I thought I was running Hive with those changes merged in, but to make
>> sure, I built the latest trunk version.  The behavior changed somewhat
>> (as in, it runs 2 stages instead of 1), but it still generates the
>> same number of files (# of files generated is equal to the number of
>> the original mappers, so I have no idea what the second stage is
>> actually doing).
>>
>> See below for query / explain query.  Stage 1 runs always; Stage 3
>> runs if hive.merge.mapfiles=true is set, but it still generates lots
>> of small files.
>>
>> The query is kind of large, but in essence it's simply
>> insert overwrite table foo partition(bar) select [columns] from
>> [table] tablesample(bucket 1 out of 10000 on rand()) where
>> [conditions].
>>
>>
>> explain insert overwrite table hbase_prefilter3_us_sample partition
>> (ds) select
>> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
>> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
>> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
>> am_s.ds='2010-11-05' and am_s.request_url rlike
>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
>> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
>> OK
>> ABSTRACT SYNTAX TREE:
>>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
>> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
>> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
>> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
>> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
>> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
>> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
>> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
>> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
>> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
>> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
>> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
>> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
>> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
>> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
>> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
>> './GeoIP.dat') 'US')))))
>>
>> STAGE DEPENDENCIES:
>>  Stage-1 is a root stage
>>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>>  Stage-4
>>  Stage-0 depends on stages: Stage-4, Stage-3
>>  Stage-2 depends on stages: Stage-0
>>  Stage-3
>>
>> STAGE PLANS:
>>  Stage: Stage-1
>>    Map Reduce
>>      Alias -> Map Operator Tree:
>>        am_s
>>          TableScan
>>            alias: am_s
>>            Filter Operator
>>              predicate:
>>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>>                  type: boolean
>>              Filter Operator
>>                predicate:
>>                    expr: ((request_url rlike
>> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>                    type: boolean
>>                Filter Operator
>>                  predicate:
>>                      expr: (((ds = '2010-11-05') and (request_url
>> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
>> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>>                      type: boolean
>>                  Select Operator
>>                    expressions:
>>                          expr: server_host
>>                          type: string
>>                          expr: client_ip
>>                          type: int
>>                          expr: time_stamp
>>                          type: int
>>                          expr: concat(server_host, ':',
>> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>>                          type: string
>>                          expr: referrer
>>                          type: string
>>                          expr: parse_url(referrer, 'HOST')
>>                          type: string
>>                          expr: user_agent
>>                          type: string
>>                          expr: cookie
>>                          type: string
>>                          expr: GenericUDFGeoIP ( client_ip,
>> 'COUNTRY_CODE', './GeoIP.dat' )
>>                          type: string
>>                          expr: ''
>>                          type: string
>>                          expr: ds
>>                          type: string
>>                    outputColumnNames: _col0, _col1, _col2, _col3,
>> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>>                    File Output Operator
>>                      compressed: true
>>                      GlobalTableId: 1
>>                      table:
>>                          input format:
>> org.apache.hadoop.mapred.TextInputFormat
>>                          output format:
>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>                          serde:
>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>                          name: hbase_prefilter3_us_sample
>>
>>  Stage: Stage-5
>>    Conditional Operator
>>
>>  Stage: Stage-4
>>    Move Operator
>>      files:
>>          hdfs directory: true
>>          destination:
>>
>> hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>>
>>  Stage: Stage-0
>>    Move Operator
>>      tables:
>>          partition:
>>            ds
>>          replace: true
>>          table:
>>              input format: org.apache.hadoop.mapred.TextInputFormat
>>              output format:
>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>              name: hbase_prefilter3_us_sample
>>
>>  Stage: Stage-2
>>    Stats-Aggr Operator
>>
>>  Stage: Stage-3
>>    Map Reduce
>>      Alias -> Map Operator Tree:
>>
>>  hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>>            File Output Operator
>>              compressed: true
>>              GlobalTableId: 0
>>              table:
>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>                  output format:
>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>>                  name: hbase_prefilter3_us_sample
>>
>>
>>
>>
>> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
>> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
>> > to be there for merging to take place. HIVE-1307 was committed to trunk on
>> > 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
>> > your Hive trunk and rerun the query. If it still doesn't work maybe you can
>> > post your query and the result of 'explain <query>' and we can take a look.
>> >
>> > Ning
>> >
>> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>> >
>> >> Hi Ning,
>> >> For the dataset I'm experimenting with, the total size of the output
>> >> is 2mb, and the files are at most a few kb in size.  My
>> >> hive.input.format was set to default HiveInputFormat; however, when I
>> >> set it to CombineHiveInputFormat, it only made the first stage of the
>> >> job use fewer mappers.  The merge job was *still* filtered out at
>> >> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>> >> have any effect.
>> >>
>> >> I am a bit at a loss what to do here.  Is there a way to see what's
>> >> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>> >> dynamic partitions; could that somehow be interfering with the merge
>> >> job?..
>> >>
>> >> I'm running a relatively fresh Hive from trunk (built maybe a month
>> >> ago).
>> >>
>> >> --Leo
>> >>
>> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>> >>> The settings looks good. The parameter
>> >>> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
>> >>> merge should be triggered: if the average size of the files in the partition
>> >>> is SMALLER than the parameter and there are more than 1 file, the merge
>> >>> should be scheduled. Can you try to see if you have any big files as well in
>> >>> your resulting partition? If it is because of a very large file, you can set
>> >>> the parameter large enough.
>> >>>
>> >>> Another possibility is that your Hadoop installation does not support
>> >>> CombineHiveInputFormat, which is used for the new merge job. Someone
>> >>> reported previously merge was not successful because of this. If that's the
>> >>> case, you can turn off CombineHiveInputFormat and use the old
>> >>> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>> >>>
>> >>> Ning
>> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>> >>>
>> >>>> I have jobs that sample (or generate) a small amount of data from a
>> >>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>> >>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>> >>>> to merge the output?  I have the following settings:
>> >>>>
>> >>>> hive.merge.mapfiles=true
>> >>>> hive.merge.mapredfiles=true
>> >>>> hive.merge.size.per.task=256000000
>> >>>> hive.merge.size.smallfiles.avgsize=16000000
>> >>>>
>> >>>> After setting hive.merge* to true, Hive started indicating "Total
>> >>>> MapReduce jobs = 2".  However, after generating the
>> >>>> lots-of-small-files table, Hive says:
>> >>>> Ended Job = job_201011021934_1344
>> >>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>> >>>>
>> >>>> Is there a way to force the merge, or am I missing something?
>> >>>> --Leo
>> >>>
>> >>>
>> >
>> >
>
>
>
> --
> Dave Brondsema
> Software Engineer
> Geeknet
>
> www.geek.net
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Dave Brondsema <db...@geek.net>.
What version of Hadoop are you on?

On Thu, Nov 18, 2010 at 10:48 PM, Leo Alekseyev <dn...@gmail.com> wrote:

> I thought I was running Hive with those changes merged in, but to make
> sure, I built the latest trunk version.  The behavior changed somewhat
> (as in, it runs 2 stages instead of 1), but it still generates the
> same number of files (# of files generated is equal to the number of
> the original mappers, so I have no idea what the second stage is
> actually doing).
>
> See below for query / explain query.  Stage 1 runs always; Stage 3
> runs if hive.merge.mapfiles=true is set, but it still generates lots
> of small files.
>
> The query is kind of large, but in essence it's simply
> insert overwrite table foo partition(bar) select [columns] from
> [table] tablesample(bucket 1 out of 10000 on rand()) where
> [conditions].
>
>
> explain insert overwrite table hbase_prefilter3_us_sample partition
> (ds) select
> server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
> 'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
> TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
> am_s.ds='2010-11-05' and am_s.request_url rlike
> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
> geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
> OK
> ABSTRACT SYNTAX TREE:
>  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
> 10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
> (TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
> (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
> (TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
> time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
> server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
> request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
> (TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
> (TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
> user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
> (TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
> './GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
> (TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
> (rlike (. (TOK_TABLE_OR_COL am_s) request_url)
> '^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
> geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
> './GeoIP.dat') 'US')))))
>
> STAGE DEPENDENCIES:
>  Stage-1 is a root stage
>  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
>  Stage-4
>  Stage-0 depends on stages: Stage-4, Stage-3
>  Stage-2 depends on stages: Stage-0
>  Stage-3
>
> STAGE PLANS:
>  Stage: Stage-1
>    Map Reduce
>      Alias -> Map Operator Tree:
>        am_s
>          TableScan
>            alias: am_s
>            Filter Operator
>              predicate:
>                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
>                  type: boolean
>              Filter Operator
>                predicate:
>                    expr: ((request_url rlike
> '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>                    type: boolean
>                Filter Operator
>                  predicate:
>                      expr: (((ds = '2010-11-05') and (request_url
> rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
> (GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
>                      type: boolean
>                  Select Operator
>                    expressions:
>                          expr: server_host
>                          type: string
>                          expr: client_ip
>                          type: int
>                          expr: time_stamp
>                          type: int
>                          expr: concat(server_host, ':',
> regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
>                          type: string
>                          expr: referrer
>                          type: string
>                          expr: parse_url(referrer, 'HOST')
>                          type: string
>                          expr: user_agent
>                          type: string
>                          expr: cookie
>                          type: string
>                          expr: GenericUDFGeoIP ( client_ip,
> 'COUNTRY_CODE', './GeoIP.dat' )
>                          type: string
>                          expr: ''
>                          type: string
>                          expr: ds
>                          type: string
>                    outputColumnNames: _col0, _col1, _col2, _col3,
> _col4, _col5, _col6, _col7, _col8, _col9, _col10
>                    File Output Operator
>                      compressed: true
>                      GlobalTableId: 1
>                      table:
>                          input format:
> org.apache.hadoop.mapred.TextInputFormat
>                          output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                          serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                          name: hbase_prefilter3_us_sample
>
>  Stage: Stage-5
>    Conditional Operator
>
>  Stage: Stage-4
>    Move Operator
>      files:
>          hdfs directory: true
>          destination:
> hdfs://
> namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000
>
>  Stage: Stage-0
>    Move Operator
>      tables:
>          partition:
>            ds
>          replace: true
>          table:
>              input format: org.apache.hadoop.mapred.TextInputFormat
>              output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>              name: hbase_prefilter3_us_sample
>
>  Stage: Stage-2
>    Stats-Aggr Operator
>
>  Stage: Stage-3
>    Map Reduce
>      Alias -> Map Operator Tree:
>        hdfs://
> namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
>            File Output Operator
>              compressed: true
>              GlobalTableId: 0
>              table:
>                  input format: org.apache.hadoop.mapred.TextInputFormat
>                  output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                  name: hbase_prefilter3_us_sample
>
>
>
>
> On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
> > I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need
> to be there for merging to take place. HIVE-1307 was committed to trunk on
> 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update
> your Hive trunk and rerun the query. If it still doesn't work maybe you can
> post your query and the result of 'explain <query>' and we can take a look.
> >
> > Ning
> >
> > On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
> >
> >> Hi Ning,
> >> For the dataset I'm experimenting with, the total size of the output
> >> is 2mb, and the files are at most a few kb in size.  My
> >> hive.input.format was set to default HiveInputFormat; however, when I
> >> set it to CombineHiveInputFormat, it only made the first stage of the
> >> job use fewer mappers.  The merge job was *still* filtered out at
> >> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
> >> have any effect.
> >>
> >> I am a bit at a loss what to do here.  Is there a way to see what's
> >> going on exactly using e.g. debug log levels?..  Btw, I'm also using
> >> dynamic partitions; could that somehow be interfering with the merge
> >> job?..
> >>
> >> I'm running a relatively fresh Hive from trunk (built maybe a month
> ago).
> >>
> >> --Leo
> >>
> >> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
> >>> The settings looks good. The parameter
> hive.merge.size.smallfiles.avgsize is used to determine at run time if a
> merge should be triggered: if the average size of the files in the partition
> is SMALLER than the parameter and there are more than 1 file, the merge
> should be scheduled. Can you try to see if you have any big files as well in
> your resulting partition? If it is because of a very large file, you can set
> the parameter large enough.
> >>>
> >>> Another possibility is that your Hadoop installation does not support
> CombineHiveInputFormat, which is used for the new merge job. Someone
> reported previously merge was not successful because of this. If that's the
> case, you can turn off CombineHiveInputFormat and use the old
> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
> >>>
> >>> Ning
> >>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
> >>>
> >>>> I have jobs that sample (or generate) a small amount of data from a
> >>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
> >>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
> >>>> to merge the output?  I have the following settings:
> >>>>
> >>>> hive.merge.mapfiles=true
> >>>> hive.merge.mapredfiles=true
> >>>> hive.merge.size.per.task=256000000
> >>>> hive.merge.size.smallfiles.avgsize=16000000
> >>>>
> >>>> After setting hive.merge* to true, Hive started indicating "Total
> >>>> MapReduce jobs = 2".  However, after generating the
> >>>> lots-of-small-files table, Hive says:
> >>>> Ended Job = job_201011021934_1344
> >>>> Ended Job = 781771542, job is filtered out (removed at runtime).
> >>>>
> >>>> Is there a way to force the merge, or am I missing something?
> >>>> --Leo
> >>>
> >>>
> >
> >
>



-- 
Dave Brondsema
Software Engineer
Geeknet

www.geek.net

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Leo Alekseyev <dn...@gmail.com>.
I thought I was running Hive with those changes merged in, but to make
sure, I built the latest trunk version.  The behavior changed somewhat
(as in, it runs 2 stages instead of 1), but it still generates the
same number of files (# of files generated is equal to the number of
the original mappers, so I have no idea what the second stage is
actually doing).

See below for query / explain query.  Stage 1 runs always; Stage 3
runs if hive.merge.mapfiles=true is set, but it still generates lots
of small files.

The query is kind of large, but in essence it's simply
insert overwrite table foo partition(bar) select [columns] from
[table] tablesample(bucket 1 out of 10000 on rand()) where
[conditions].


explain insert overwrite table hbase_prefilter3_us_sample partition
(ds) select server_host,client_ip,time_stamp,concat(server_host,':',regexp_extract(request_url,'/[^/]+/[^/]+/([^/]+)$',1)),referrer,parse_url(referrer,'HOST'),user_agent,cookie,geoip_int(client_ip,
'COUNTRY_CODE',  './GeoIP.dat'),'',ds from alogs_master
TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rand()) am_s where
am_s.ds='2010-11-05' and am_s.request_url rlike
'^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$' and
geoip_int(am_s.client_ip, 'COUNTRY_CODE',  './GeoIP.dat')='US';
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF alogs_master (TOK_TABLESAMPLE 1
10000 (TOK_FUNCTION rand)) am_s)) (TOK_INSERT (TOK_DESTINATION
(TOK_TAB hbase_prefilter3_us_sample (TOK_PARTSPEC (TOK_PARTVAL ds))))
(TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL server_host)) (TOK_SELEXPR
(TOK_TABLE_OR_COL client_ip)) (TOK_SELEXPR (TOK_TABLE_OR_COL
time_stamp)) (TOK_SELEXPR (TOK_FUNCTION concat (TOK_TABLE_OR_COL
server_host) ':' (TOK_FUNCTION regexp_extract (TOK_TABLE_OR_COL
request_url) '/[^/]+/[^/]+/([^/]+)$' 1))) (TOK_SELEXPR
(TOK_TABLE_OR_COL referrer)) (TOK_SELEXPR (TOK_FUNCTION parse_url
(TOK_TABLE_OR_COL referrer) 'HOST')) (TOK_SELEXPR (TOK_TABLE_OR_COL
user_agent)) (TOK_SELEXPR (TOK_TABLE_OR_COL cookie)) (TOK_SELEXPR
(TOK_FUNCTION geoip_int (TOK_TABLE_OR_COL client_ip) 'COUNTRY_CODE'
'./GeoIP.dat')) (TOK_SELEXPR '') (TOK_SELEXPR (TOK_TABLE_OR_COL ds)))
(TOK_WHERE (and (and (= (. (TOK_TABLE_OR_COL am_s) ds) '2010-11-05')
(rlike (. (TOK_TABLE_OR_COL am_s) request_url)
'^/img[0-9]+/[0-9]+/[^.]+\.(png|jpg|gif|mp4|swf)$')) (= (TOK_FUNCTION
geoip_int (. (TOK_TABLE_OR_COL am_s) client_ip) 'COUNTRY_CODE'
'./GeoIP.dat') 'US')))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3
  Stage-4
  Stage-0 depends on stages: Stage-4, Stage-3
  Stage-2 depends on stages: Stage-0
  Stage-3

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        am_s
          TableScan
            alias: am_s
            Filter Operator
              predicate:
                  expr: (((hash(rand()) & 2147483647) % 10000) = 0)
                  type: boolean
              Filter Operator
                predicate:
                    expr: ((request_url rlike
'^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$') and
(GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
                    type: boolean
                Filter Operator
                  predicate:
                      expr: (((ds = '2010-11-05') and (request_url
rlike '^/img[0-9]+/[0-9]+/[^.]+.(png|jpg|gif|mp4|swf)$')) and
(GenericUDFGeoIP ( client_ip, 'COUNTRY_CODE', './GeoIP.dat' ) = 'US'))
                      type: boolean
                  Select Operator
                    expressions:
                          expr: server_host
                          type: string
                          expr: client_ip
                          type: int
                          expr: time_stamp
                          type: int
                          expr: concat(server_host, ':',
regexp_extract(request_url, '/[^/]+/[^/]+/([^/]+)$', 1))
                          type: string
                          expr: referrer
                          type: string
                          expr: parse_url(referrer, 'HOST')
                          type: string
                          expr: user_agent
                          type: string
                          expr: cookie
                          type: string
                          expr: GenericUDFGeoIP ( client_ip,
'COUNTRY_CODE', './GeoIP.dat' )
                          type: string
                          expr: ''
                          type: string
                          expr: ds
                          type: string
                    outputColumnNames: _col0, _col1, _col2, _col3,
_col4, _col5, _col6, _col7, _col8, _col9, _col10
                    File Output Operator
                      compressed: true
                      GlobalTableId: 1
                      table:
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          name: hbase_prefilter3_us_sample

  Stage: Stage-5
    Conditional Operator

  Stage: Stage-4
    Move Operator
      files:
          hdfs directory: true
          destination:
hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10000

  Stage: Stage-0
    Move Operator
      tables:
          partition:
            ds
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: hbase_prefilter3_us_sample

  Stage: Stage-2
    Stats-Aggr Operator

  Stage: Stage-3
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://namenode.imageshack.us:9000/tmp/hive-hadoop/hive_2010-11-18_17-58-36_843_6726655151866456030/-ext-10002
            File Output Operator
              compressed: true
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                  name: hbase_prefilter3_us_sample




On Thu, Nov 18, 2010 at 3:44 PM, Ning Zhang <nz...@fb.com> wrote:
> I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need to be there for merging to take place. HIVE-1307 was committed to trunk on 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update your Hive trunk and rerun the query. If it still doesn't work maybe you can post your query and the result of 'explain <query>' and we can take a look.
>
> Ning
>
> On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:
>
>> Hi Ning,
>> For the dataset I'm experimenting with, the total size of the output
>> is 2mb, and the files are at most a few kb in size.  My
>> hive.input.format was set to default HiveInputFormat; however, when I
>> set it to CombineHiveInputFormat, it only made the first stage of the
>> job use fewer mappers.  The merge job was *still* filtered out at
>> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
>> have any effect.
>>
>> I am a bit at a loss what to do here.  Is there a way to see what's
>> going on exactly using e.g. debug log levels?..  Btw, I'm also using
>> dynamic partitions; could that somehow be interfering with the merge
>> job?..
>>
>> I'm running a relatively fresh Hive from trunk (built maybe a month ago).
>>
>> --Leo
>>
>> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>>> The settings looks good. The parameter hive.merge.size.smallfiles.avgsize is used to determine at run time if a merge should be triggered: if the average size of the files in the partition is SMALLER than the parameter and there are more than 1 file, the merge should be scheduled. Can you try to see if you have any big files as well in your resulting partition? If it is because of a very large file, you can set the parameter large enough.
>>>
>>> Another possibility is that your Hadoop installation does not support CombineHiveInputFormat, which is used for the new merge job. Someone reported previously merge was not successful because of this. If that's the case, you can turn off CombineHiveInputFormat and use the old HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>>>
>>> Ning
>>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>>>
>>>> I have jobs that sample (or generate) a small amount of data from a
>>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>>> to merge the output?  I have the following settings:
>>>>
>>>> hive.merge.mapfiles=true
>>>> hive.merge.mapredfiles=true
>>>> hive.merge.size.per.task=256000000
>>>> hive.merge.size.smallfiles.avgsize=16000000
>>>>
>>>> After setting hive.merge* to true, Hive started indicating "Total
>>>> MapReduce jobs = 2".  However, after generating the
>>>> lots-of-small-files table, Hive says:
>>>> Ended Job = job_201011021934_1344
>>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>>>
>>>> Is there a way to force the merge, or am I missing something?
>>>> --Leo
>>>
>>>
>
>

Re: Using jdbc in embedded mode - Can't find warehouse directory [SOLVED]

Posted by Stuart Smith <st...@yahoo.com>.
Hello Shrijeet,

Yup. I already moved it over & it seems to work.
Moving it into hive conf was just a quick hack/test.

(the wiki has you setting hbase quorum vars via hive conf, so I assumed it wasn't *too* hackilicious).

It would be nice to have hadoop in the classpath covered on the wiki though, along with one more piece of information: 

For some odd reason, no mention is made of the hadoop.bin.path variable (a hive conf var). It's set via the HADOOP_HOME environment variable (which is mentioned on the wiki), in HiveConf.java.

Unfortunately, setting up env vars gets a little tricky in my tomcat test server running in eclipse :)

So I just set the hadoop.bin.path directly in the hive conf. Much easier then trying to muck around with environment variables.

I'd be glad to help with docs if anyone is game.

It could save someone the trouble ( fun ;) ) of grepping through source code, ripping out source from the cli client, creating a harness, tracking down code flow, etc... all to just understand how configuration variables are being used, and how to set them correctly...

At least it's on the mailing list now, I suppose..

Take care,
   -stu

--- On Fri, 11/19/10, Shrijeet Paliwal <sh...@rocketfuel.com> wrote:

> From: Shrijeet Paliwal <sh...@rocketfuel.com>
> Subject: Re: Using jdbc in embedded mode - Can't find warehouse directory [SOLVED]
> To: user@hive.apache.org
> Date: Friday, November 19, 2010, 8:30 PM
> I would say your hadoop configuration
> file(s) should have been your
> class-path (core-site.xml in this case) . You are not
> supposed to put
> hadoop parameters into hive conf files.
> 
> -Shrijeet
> 
> 
> On Fri, Nov 19, 2010 at 4:57 PM, Stuart Smith <st...@yahoo.com>
> wrote:
> >
> > Hello,
> >
> >  Just wanted to let people know I tracked this one
> down:
> >
> > It looks like it was not picking up the *hadoop*
> core-site.xml configuration file.
> >
> > - So the variable fs.default.name was never set
> >
> > - So the warehouse dir became
> file://[hive.metastore.warehouse.dir] instead of [hdfs
> location]/[hive.metastore.warehouse.dir]
> >
> > - So it couldn't find any of the warehouse files.
> >
> > - So the metastore queries would start to work, but
> the metastore couldn't find any of the backing files on
> hdfs.
> >
> > It was picking up the hive configuration, so I just
> plopped the fs.default.name property from hdfs-site.xml into
> the hive configuration.
> >
> > Should the jdbc wiki:
> >
> > http://wiki.apache.org/hadoop/Hive/HiveClient#head-fd2d8ae9e17fdc3d9b7048d088b2c23a53a6857d
> >
> > Be updated to include this information?
> >
> > It could be useful to anyone trying to use an embedded
> server (vs the example given). I would actually think this
> would apply to the standalone case as well, but I haven't
> tried it yet.
> >
> > My particular use case is using the jdbc connector in
> a java servlet (specifically, a GWT server-side RPC
> implementation).
> >
> > As an aside: is the hive jdbc connector thread-safe?
> > Assuming I instantiate within the callback method?
> > (I would think having a class Connection member would
> not be thread safe?).
> >
> > I'd be happy to help update the wiki & come up
> with an example, if that would help..
> >
> > Take care,
> >  -stu
> >
> >
> > --- On Thu, 11/18/10, Stuart Smith <st...@yahoo.com>
> wrote:
> >
> >> From: Stuart Smith <st...@yahoo.com>
> >> Subject: Using jdbc in embedded mode - Can't find
> warehouse directory
> >> To: user@hive.apache.org
> >> Date: Thursday, November 18, 2010, 7:46 PM
> >>
> >> Hello,
> >>
> >>   I'm trying to connect to hive using the JDBC
> driver
> >> in embedded mode. I can load the driver
> successfully &
> >> connect to it via:
> >>
> >> hiveConnection = DriverManager.getConnection(
> >> "jdbc:hive://", "", "" )
> >>
> >> But when I query a table that I know exists - I
> can query
> >> it via a hive command line running on the same
> machine - I
> >> get a "table does not exist" error. When I go
> ahead and
> >> create the table in my java program, and then
> query it, I
> >> get:
> >>
> >> ERROR: hive.log java.io.FileNotFoundException:
> File
> >> file:/user/hive/warehouse/[table_name]
> >>         at
> >>
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:361)
> >> ...
> >>
> >> So it looks like it's trying to use the local
> filesystem
> >> for the warehouse dir. I tried setting the
> warehouse dir
> >> variable in the hive-default.xml file to:
> >>
> >> hdfs://user/hive/warehouse/
> >>
> >> But I get the same errors.
> >>
> >> Any idea what's happening?
> >>
> >> Am I confused on what an embedded hive server can
> do - I
> >> was under the impression that the cli used an
> embedded hive
> >> server, and could connect to my hdfs store, but...
> it would
> >> seem my java program can't this.
> >>
> >> I guess my next stop is going through the hive cli
> source
> >> code ?
> >>
> >> Take care,
> >>   -stu
> >>
> >>
> >>
> >>
> >
> >
> >
> >
> 


      

Re: Using jdbc in embedded mode - Can't find warehouse directory [SOLVED]

Posted by Shrijeet Paliwal <sh...@rocketfuel.com>.
I would say your hadoop configuration file(s) should have been your
class-path (core-site.xml in this case) . You are not supposed to put
hadoop parameters into hive conf files.

-Shrijeet


On Fri, Nov 19, 2010 at 4:57 PM, Stuart Smith <st...@yahoo.com> wrote:
>
> Hello,
>
>  Just wanted to let people know I tracked this one down:
>
> It looks like it was not picking up the *hadoop* core-site.xml configuration file.
>
> - So the variable fs.default.name was never set
>
> - So the warehouse dir became file://[hive.metastore.warehouse.dir] instead of [hdfs location]/[hive.metastore.warehouse.dir]
>
> - So it couldn't find any of the warehouse files.
>
> - So the metastore queries would start to work, but the metastore couldn't find any of the backing files on hdfs.
>
> It was picking up the hive configuration, so I just plopped the fs.default.name property from hdfs-site.xml into the hive configuration.
>
> Should the jdbc wiki:
>
> http://wiki.apache.org/hadoop/Hive/HiveClient#head-fd2d8ae9e17fdc3d9b7048d088b2c23a53a6857d
>
> Be updated to include this information?
>
> It could be useful to anyone trying to use an embedded server (vs the example given). I would actually think this would apply to the standalone case as well, but I haven't tried it yet.
>
> My particular use case is using the jdbc connector in a java servlet (specifically, a GWT server-side RPC implementation).
>
> As an aside: is the hive jdbc connector thread-safe?
> Assuming I instantiate within the callback method?
> (I would think having a class Connection member would not be thread safe?).
>
> I'd be happy to help update the wiki & come up with an example, if that would help..
>
> Take care,
>  -stu
>
>
> --- On Thu, 11/18/10, Stuart Smith <st...@yahoo.com> wrote:
>
>> From: Stuart Smith <st...@yahoo.com>
>> Subject: Using jdbc in embedded mode - Can't find warehouse directory
>> To: user@hive.apache.org
>> Date: Thursday, November 18, 2010, 7:46 PM
>>
>> Hello,
>>
>>   I'm trying to connect to hive using the JDBC driver
>> in embedded mode. I can load the driver successfully &
>> connect to it via:
>>
>> hiveConnection = DriverManager.getConnection(
>> "jdbc:hive://", "", "" )
>>
>> But when I query a table that I know exists - I can query
>> it via a hive command line running on the same machine - I
>> get a "table does not exist" error. When I go ahead and
>> create the table in my java program, and then query it, I
>> get:
>>
>> ERROR: hive.log java.io.FileNotFoundException: File
>> file:/user/hive/warehouse/[table_name]
>>         at
>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:361)
>> ...
>>
>> So it looks like it's trying to use the local filesystem
>> for the warehouse dir. I tried setting the warehouse dir
>> variable in the hive-default.xml file to:
>>
>> hdfs://user/hive/warehouse/
>>
>> But I get the same errors.
>>
>> Any idea what's happening?
>>
>> Am I confused on what an embedded hive server can do - I
>> was under the impression that the cli used an embedded hive
>> server, and could connect to my hdfs store, but... it would
>> seem my java program can't this.
>>
>> I guess my next stop is going through the hive cli source
>> code ?
>>
>> Take care,
>>   -stu
>>
>>
>>
>>
>
>
>
>

Re: Using jdbc in embedded mode - Can't find warehouse directory [SOLVED]

Posted by Stuart Smith <st...@yahoo.com>.
Hello,

  Just wanted to let people know I tracked this one down:

It looks like it was not picking up the *hadoop* core-site.xml configuration file. 

- So the variable fs.default.name was never set

- So the warehouse dir became file://[hive.metastore.warehouse.dir] instead of [hdfs location]/[hive.metastore.warehouse.dir]

- So it couldn't find any of the warehouse files.

- So the metastore queries would start to work, but the metastore couldn't find any of the backing files on hdfs.

It was picking up the hive configuration, so I just plopped the fs.default.name property from hdfs-site.xml into the hive configuration.

Should the jdbc wiki:

http://wiki.apache.org/hadoop/Hive/HiveClient#head-fd2d8ae9e17fdc3d9b7048d088b2c23a53a6857d

Be updated to include this information?

It could be useful to anyone trying to use an embedded server (vs the example given). I would actually think this would apply to the standalone case as well, but I haven't tried it yet.

My particular use case is using the jdbc connector in a java servlet (specifically, a GWT server-side RPC implementation).

As an aside: is the hive jdbc connector thread-safe? 
Assuming I instantiate within the callback method?
(I would think having a class Connection member would not be thread safe?).

I'd be happy to help update the wiki & come up with an example, if that would help..

Take care,
  -stu


--- On Thu, 11/18/10, Stuart Smith <st...@yahoo.com> wrote:

> From: Stuart Smith <st...@yahoo.com>
> Subject: Using jdbc in embedded mode - Can't find warehouse directory
> To: user@hive.apache.org
> Date: Thursday, November 18, 2010, 7:46 PM
> 
> Hello,
> 
>   I'm trying to connect to hive using the JDBC driver
> in embedded mode. I can load the driver successfully &
> connect to it via:
> 
> hiveConnection = DriverManager.getConnection(
> "jdbc:hive://", "", "" )
> 
> But when I query a table that I know exists - I can query
> it via a hive command line running on the same machine - I
> get a "table does not exist" error. When I go ahead and
> create the table in my java program, and then query it, I
> get: 
> 
> ERROR: hive.log java.io.FileNotFoundException: File
> file:/user/hive/warehouse/[table_name]
>         at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:361)
> ...
> 
> So it looks like it's trying to use the local filesystem
> for the warehouse dir. I tried setting the warehouse dir
> variable in the hive-default.xml file to:
> 
> hdfs://user/hive/warehouse/
> 
> But I get the same errors.
> 
> Any idea what's happening? 
> 
> Am I confused on what an embedded hive server can do - I
> was under the impression that the cli used an embedded hive
> server, and could connect to my hdfs store, but... it would
> seem my java program can't this.
> 
> I guess my next stop is going through the hive cli source
> code ?
> 
> Take care,
>   -stu
> 
> 
>       
> 


      

Using jdbc in embedded mode - Can't find warehouse directory

Posted by Stuart Smith <st...@yahoo.com>.
Hello,

  I'm trying to connect to hive using the JDBC driver in embedded mode. I can load the driver successfully & connect to it via:

hiveConnection = DriverManager.getConnection( "jdbc:hive://", "", "" )

But when I query a table that I know exists - I can query it via a hive command line running on the same machine - I get a "table does not exist" error. When I go ahead and create the table in my java program, and then query it, I get: 

ERROR: hive.log java.io.FileNotFoundException: File file:/user/hive/warehouse/[table_name]
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:361)
...

So it looks like it's trying to use the local filesystem for the warehouse dir. I tried setting the warehouse dir variable in the hive-default.xml file to:

hdfs://user/hive/warehouse/

But I get the same errors.

Any idea what's happening? 

Am I confused on what an embedded hive server can do - I was under the impression that the cli used an embedded hive server, and could connect to my hdfs store, but... it would seem my java program can't this.

I guess my next stop is going through the hive cli source code ?

Take care,
  -stu


      

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Ning Zhang <nz...@fb.com>.
I see. If you are using dynamic partitions, HIVE-1307 and HIVE-1622 need to be there for merging to take place. HIVE-1307 was committed to trunk on 08/25 and HIVE-1622 was committed on 09/13. The simplest way is to update your Hive trunk and rerun the query. If it still doesn't work maybe you can post your query and the result of 'explain <query>' and we can take a look. 

Ning

On Nov 18, 2010, at 2:57 PM, Leo Alekseyev wrote:

> Hi Ning,
> For the dataset I'm experimenting with, the total size of the output
> is 2mb, and the files are at most a few kb in size.  My
> hive.input.format was set to default HiveInputFormat; however, when I
> set it to CombineHiveInputFormat, it only made the first stage of the
> job use fewer mappers.  The merge job was *still* filtered out at
> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
> have any effect.
> 
> I am a bit at a loss what to do here.  Is there a way to see what's
> going on exactly using e.g. debug log levels?..  Btw, I'm also using
> dynamic partitions; could that somehow be interfering with the merge
> job?..
> 
> I'm running a relatively fresh Hive from trunk (built maybe a month ago).
> 
> --Leo
> 
> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
>> The settings looks good. The parameter hive.merge.size.smallfiles.avgsize is used to determine at run time if a merge should be triggered: if the average size of the files in the partition is SMALLER than the parameter and there are more than 1 file, the merge should be scheduled. Can you try to see if you have any big files as well in your resulting partition? If it is because of a very large file, you can set the parameter large enough.
>> 
>> Another possibility is that your Hadoop installation does not support CombineHiveInputFormat, which is used for the new merge job. Someone reported previously merge was not successful because of this. If that's the case, you can turn off CombineHiveInputFormat and use the old HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>> 
>> Ning
>> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>> 
>>> I have jobs that sample (or generate) a small amount of data from a
>>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>>> to merge the output?  I have the following settings:
>>> 
>>> hive.merge.mapfiles=true
>>> hive.merge.mapredfiles=true
>>> hive.merge.size.per.task=256000000
>>> hive.merge.size.smallfiles.avgsize=16000000
>>> 
>>> After setting hive.merge* to true, Hive started indicating "Total
>>> MapReduce jobs = 2".  However, after generating the
>>> lots-of-small-files table, Hive says:
>>> Ended Job = job_201011021934_1344
>>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>> 
>>> Is there a way to force the merge, or am I missing something?
>>> --Leo
>> 
>> 


Re: Hive produces very small files despite hive.merge...=true settings

Posted by Ted Yu <yu...@gmail.com>.
Leo:
You may find this helpful:
http://indoos.wordpress.com/2010/06/24/hive-remote-debugging/

On Thu, Nov 18, 2010 at 2:57 PM, Leo Alekseyev <dn...@gmail.com> wrote:

> Hi Ning,
> For the dataset I'm experimenting with, the total size of the output
> is 2mb, and the files are at most a few kb in size.  My
> hive.input.format was set to default HiveInputFormat; however, when I
> set it to CombineHiveInputFormat, it only made the first stage of the
> job use fewer mappers.  The merge job was *still* filtered out at
> runtime.  I also tried set hive.mergejob.maponly=false; that didn't
> have any effect.
>
> I am a bit at a loss what to do here.  Is there a way to see what's
> going on exactly using e.g. debug log levels?..  Btw, I'm also using
> dynamic partitions; could that somehow be interfering with the merge
> job?..
>
> I'm running a relatively fresh Hive from trunk (built maybe a month ago).
>
> --Leo
>
> On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
> > The settings looks good. The parameter hive.merge.size.smallfiles.avgsize
> is used to determine at run time if a merge should be triggered: if the
> average size of the files in the partition is SMALLER than the parameter and
> there are more than 1 file, the merge should be scheduled. Can you try to
> see if you have any big files as well in your resulting partition? If it is
> because of a very large file, you can set the parameter large enough.
> >
> > Another possibility is that your Hadoop installation does not support
> CombineHiveInputFormat, which is used for the new merge job. Someone
> reported previously merge was not successful because of this. If that's the
> case, you can turn off CombineHiveInputFormat and use the old
> HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
> >
> > Ning
> > On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
> >
> >> I have jobs that sample (or generate) a small amount of data from a
> >> large table.  At the end, I get e.g. about 3000 or more files of 1kb
> >> or so.  This becomes a nuisance.  How can I make Hive do another pass
> >> to merge the output?  I have the following settings:
> >>
> >> hive.merge.mapfiles=true
> >> hive.merge.mapredfiles=true
> >> hive.merge.size.per.task=256000000
> >> hive.merge.size.smallfiles.avgsize=16000000
> >>
> >> After setting hive.merge* to true, Hive started indicating "Total
> >> MapReduce jobs = 2".  However, after generating the
> >> lots-of-small-files table, Hive says:
> >> Ended Job = job_201011021934_1344
> >> Ended Job = 781771542, job is filtered out (removed at runtime).
> >>
> >> Is there a way to force the merge, or am I missing something?
> >> --Leo
> >
> >
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Leo Alekseyev <dn...@gmail.com>.
Hi Ning,
For the dataset I'm experimenting with, the total size of the output
is 2mb, and the files are at most a few kb in size.  My
hive.input.format was set to default HiveInputFormat; however, when I
set it to CombineHiveInputFormat, it only made the first stage of the
job use fewer mappers.  The merge job was *still* filtered out at
runtime.  I also tried set hive.mergejob.maponly=false; that didn't
have any effect.

I am a bit at a loss what to do here.  Is there a way to see what's
going on exactly using e.g. debug log levels?..  Btw, I'm also using
dynamic partitions; could that somehow be interfering with the merge
job?..

I'm running a relatively fresh Hive from trunk (built maybe a month ago).

--Leo

On Thu, Nov 18, 2010 at 1:12 PM, Ning Zhang <nz...@fb.com> wrote:
> The settings looks good. The parameter hive.merge.size.smallfiles.avgsize is used to determine at run time if a merge should be triggered: if the average size of the files in the partition is SMALLER than the parameter and there are more than 1 file, the merge should be scheduled. Can you try to see if you have any big files as well in your resulting partition? If it is because of a very large file, you can set the parameter large enough.
>
> Another possibility is that your Hadoop installation does not support CombineHiveInputFormat, which is used for the new merge job. Someone reported previously merge was not successful because of this. If that's the case, you can turn off CombineHiveInputFormat and use the old HiveInputFormat (though slower) by setting hive.mergejob.maponly=false.
>
> Ning
> On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:
>
>> I have jobs that sample (or generate) a small amount of data from a
>> large table.  At the end, I get e.g. about 3000 or more files of 1kb
>> or so.  This becomes a nuisance.  How can I make Hive do another pass
>> to merge the output?  I have the following settings:
>>
>> hive.merge.mapfiles=true
>> hive.merge.mapredfiles=true
>> hive.merge.size.per.task=256000000
>> hive.merge.size.smallfiles.avgsize=16000000
>>
>> After setting hive.merge* to true, Hive started indicating "Total
>> MapReduce jobs = 2".  However, after generating the
>> lots-of-small-files table, Hive says:
>> Ended Job = job_201011021934_1344
>> Ended Job = 781771542, job is filtered out (removed at runtime).
>>
>> Is there a way to force the merge, or am I missing something?
>> --Leo
>
>

Re: Hive produces very small files despite hive.merge...=true settings

Posted by Ning Zhang <nz...@fb.com>.
The settings looks good. The parameter hive.merge.size.smallfiles.avgsize is used to determine at run time if a merge should be triggered: if the average size of the files in the partition is SMALLER than the parameter and there are more than 1 file, the merge should be scheduled. Can you try to see if you have any big files as well in your resulting partition? If it is because of a very large file, you can set the parameter large enough.

Another possibility is that your Hadoop installation does not support CombineHiveInputFormat, which is used for the new merge job. Someone reported previously merge was not successful because of this. If that's the case, you can turn off CombineHiveInputFormat and use the old HiveInputFormat (though slower) by setting hive.mergejob.maponly=false. 

Ning
On Nov 17, 2010, at 6:00 PM, Leo Alekseyev wrote:

> I have jobs that sample (or generate) a small amount of data from a
> large table.  At the end, I get e.g. about 3000 or more files of 1kb
> or so.  This becomes a nuisance.  How can I make Hive do another pass
> to merge the output?  I have the following settings:
> 
> hive.merge.mapfiles=true
> hive.merge.mapredfiles=true
> hive.merge.size.per.task=256000000
> hive.merge.size.smallfiles.avgsize=16000000
> 
> After setting hive.merge* to true, Hive started indicating "Total
> MapReduce jobs = 2".  However, after generating the
> lots-of-small-files table, Hive says:
> Ended Job = job_201011021934_1344
> Ended Job = 781771542, job is filtered out (removed at runtime).
> 
> Is there a way to force the merge, or am I missing something?
> --Leo