You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Marcin Michalski <mm...@ifwe.co> on 2016/11/10 02:56:06 UTC

Figuring out to which CombineFileSplit the input record of DoFn process each record belongs to

Hi, is it possible to tie each record from DoFn's process method's to the
single input split from CombinedFileSplit? I basically want to get some
info from the input HDFS directory of the input split (/data/*20161109/11*)
and use it to enhance a each record that is being read by process method. I
was able to hack the access issue of CrunchInputSplit by using reflection
but then I am not sure how to tie each input record to one input split
since my job reads multiple files from different directories that have
date/hour information that I need.


@Override
 public void process(GenericData.Record eventRecord, Emitter<Pair<String,
GenericData.Record>> pairEmitter) {
    if(getContext() instanceof MapContext) {
                InputSplit inputSplit = ((MapContext)
getContext()).getInputSplit();
                Class<? extends InputSplit> splitClass =
inputSplit.getClass();

                try {
                    Method getInputSplitMethod = splitClass
                            .getDeclaredMethod("get");
                    getInputSplitMethod.setAccessible(true);
                    CombineFileSplit fileSplit = (CombineFileSplit)
getInputSplitMethod.invoke(inputSplit);

                    System.out.println("number of input files: " +
fileSplit.getPaths().length);
                    int index = 0;
                    for(Path p: fileSplit.getPaths()) {
                        System.out.println("split length: " +
fileSplit.getLength(index) + " partition: "
                                + getPartitionDt(fileSplit.getPath(index)));
                        index ++;
                    }
                } catch (Exception e) {
                    System.out.println("we have a problem");
                    e.printStackTrace();
                }
            }
        }

...now I want to output a pair of of Partition info YYYYMMDDHH and some
modified avro record. Any idea how I can get the directory information of
the inputsplit that is being processed by each call of the process method?
...
emit(Pair.of(partition, some_avro_record)));

I know that I could disable the combined input file format but I don't want
to do that

Thanks!
-- 
Marcin Michalski | Big Data Engineer
mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co