You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Ben Roling <be...@gmail.com> on 2018/01/30 20:22:25 UTC

Re: Figuring out to which CombineFileSplit the input record of DoFn process each record belongs to

We recently came into a situation where we had this same sort of need.  I
logged https://issues.apache.org/jira/browse/CRUNCH-663 with a potential
solution.  I'm curious for any feedback.

On Thu, Nov 10, 2016 at 12:48 PM Marcin Michalski <mm...@ifwe.co>
wrote:

> That doesn't work since casting CombineFileSplit to FileSplit is gives
> ClassCastException:
>
> Error: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
> cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit at
> com.tagged.ramblas.archive.ArchiveCrunchFns$FromEvent2RedhshiftWithPartitionInfoFn.process(ArchiveCrunchFns.java:205)
> a
>
> On Thu, Nov 10, 2016 at 10:30 AM, David Ortiz <do...@videologygroup.com>
> wrote:
>
>> Looks like what I was doing was:
>>
>>
>>
>> String loc = ((FileSplit) ((Supplier<InputSplit>) ((MapContext)
>>       *this*.getContext()).getInputSplit()).get()).getPath().toString();
>>
>>
>>
>>
>>
>> I believe this was occurring on combined files, but it’s been awhile, so
>> I am not 100% sure.
>>
>>
>>
>> *From:* Marcin Michalski [mailto:mmichalski@ifwe.co]
>> *Sent:* Thursday, November 10, 2016 1:22 PM
>> *To:* user@crunch.apache.org
>> *Subject:* Re: Figuring out to which CombineFileSplit the input record
>> of DoFn process each record belongs to
>>
>>
>>
>> I have taken a look but don't see anything that would give me access to
>> the input record's dir location.  Could you point me in the right direction?
>>
>>
>> On Nov 9, 2016, at 7:50 PM, David Ortiz <do...@videologygroup.com>
>> wrote:
>>
>> I can look up the exact methods in the morning, but in short, the DoFn
>> does have a way to grab the TaskContext object when running using
>> MapReduce.  From there you can get the split.
>>
>>
>>
>> *Sent from my Verizon Wireless 4G LTE DROID*
>>
>> On Nov 9, 2016 9:56 PM, Marcin Michalski <mm...@ifwe.co> wrote:
>>
>> Hi, is it possible to tie each record from DoFn's process method's to the
>> single input split from CombinedFileSplit? I basically want to get some
>> info from the input HDFS directory of the input split (/data/
>> *20161109/11*) and use it to enhance a each record that is being read by
>> process method. I was able to hack the access issue of CrunchInputSplit by
>> using reflection but then I am not sure how to tie each input record to one
>> input split since my job reads multiple files from different directories
>> that have date/hour information that I need.
>>
>>
>>
>>
>>
>> @Override
>>
>>  public void process(GenericData.Record eventRecord, Emitter<Pair<String,
>> GenericData.Record>> pairEmitter) {
>>
>>     if(getContext() instanceof MapContext) {
>>
>>                 InputSplit inputSplit = ((MapContext)
>> getContext()).getInputSplit();
>>
>>                 Class<? extends InputSplit> splitClass =
>> inputSplit.getClass();
>>
>>
>>
>>                 try {
>>
>>                     Method getInputSplitMethod = splitClass
>>
>>                             .getDeclaredMethod("get");
>>
>>                     getInputSplitMethod.setAccessible(true);
>>
>>                     CombineFileSplit fileSplit = (CombineFileSplit)
>> getInputSplitMethod.invoke(inputSplit);
>>
>>
>>
>>                     System.out.println("number of input files: " +
>> fileSplit.getPaths().length);
>>
>>                     int index = 0;
>>
>>                     for(Path p: fileSplit.getPaths()) {
>>
>>                         System.out.println("split length: " +
>> fileSplit.getLength(index) + " partition: "
>>
>>                                 +
>> getPartitionDt(fileSplit.getPath(index)));
>>
>>                         index ++;
>>
>>                     }
>>
>>                 } catch (Exception e) {
>>
>>                     System.out.println("we have a problem");
>>
>>                     e.printStackTrace();
>>
>>                 }
>>
>>             }
>>
>>         }
>>
>>
>>
>> ...now I want to output a pair of of Partition info YYYYMMDDHH and some
>> modified avro record. Any idea how I can get the directory information of
>> the inputsplit that is being processed by each call of the process method?
>>
>> ...
>>
>> emit(Pair.of(partition, some_avro_record)));
>>
>>
>>
>> I know that I could disable the combined input file format but I don't
>> want to do that
>>
>>
>>
>> Thanks!
>>
>> --
>>
>> Marcin Michalski | Big Data Engineer
>>
>> mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
>>
>> <http://www.ifwe.co/>
>>
>> Tagged, Inc. is now if(we). Learn more at ifwe.co
>>
>> *This email is intended only for the use of the individual(s) to whom it
>> is addressed. If you have received this communication in error, please
>> immediately notify the sender and delete the original email.*
>>
>> *Disclaimer*
>>
>> The information contained in this communication from the sender is
>> confidential. It is intended solely for use by the recipient and others
>> authorized to receive it. If you are not the recipient, you are hereby
>> notified that any disclosure, copying, distribution or taking action in
>> relation of the contents of this information is strictly prohibited and may
>> be unlawful.
>>
>> This email has been scanned for viruses and malware, and may have been
>> automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> Service (SaaS) for business. Providing a *safer* and *more useful* place
>> for your human generated data. Specializing in; Security, archiving and
>> compliance. To find out more Click Here
>> <http://www.mimecast.com/products/>.
>>
>> *This email is intended only for the use of the individual(s) to whom it
>> is addressed. If you have received this communication in error, please
>> immediately notify the sender and delete the original email.*
>>
>> *Disclaimer*
>>
>> The information contained in this communication from the sender is
>> confidential. It is intended solely for use by the recipient and others
>> authorized to receive it. If you are not the recipient, you are hereby
>> notified that any disclosure, copying, distribution or taking action in
>> relation of the contents of this information is strictly prohibited and may
>> be unlawful.
>>
>> This email has been scanned for viruses and malware, and may have been
>> automatically archived by *Mimecast Ltd*, an innovator in Software as a
>> Service (SaaS) for business. Providing a *safer* and *more useful* place
>> for your human generated data. Specializing in; Security, archiving and
>> compliance. To find out more Click Here
>> <http://www.mimecast.com/products/>.
>>
>
>
>
> --
> Marcin Michalski | Big Data Engineer
> mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
> <http://www.ifwe.co/>
> Tagged, Inc. is now if(we). Learn more at ifwe.co
>