You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ted Yu <yu...@gmail.com> on 2017/08/03 21:32:59 UTC

Re: Customer inputformat

Did you use StreamExecutionEnvironment.createFileInput() ?

What did the modification times of the 2 files look like (were they the
newest) ?

Cheers

On Mon, Jul 31, 2017 at 12:42 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> Thanks! When I give path to a directory flink is only reading 2 files. It
> seems to be picking these 2 files randomly.
>
> On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Mohit,
>>
>> as Ted said, there are plenty of InputFormats which are based on
>> FileInputFormat.
>> FileInputFormat also supports reading all files in a directory. Simply
>> specify the path of the directory.
>>
>> Check StreamExecutionEnvironment.createFileInput() which takes a several
>> parameters such as a FileInputFormat and a time interval in which the
>> directory is periodically checked.
>>
>> Best, Fabian
>>
>> 2017-07-30 21:31 GMT+02:00 Ted Yu <yu...@gmail.com>:
>>
>>> For #1, you can find quite a few classes which extend FileInputFormat.
>>> e.g.
>>>
>>> flink-connectors/flink-avro/src/main/java/org/apache/flink/a
>>> pi/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends
>>> FileInputFormat<E> implements ResultTypeQuer
>>> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public
>>> abstract class BinaryInputFormat<T> extends FileInputFormat<T>
>>> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public
>>> abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
>>> implements Checkpoi
>>>
>>> flink-streaming-java/src/test/java/org/apache/flink/streamin
>>> g/runtime/operators/ContinuousFileProcessingRescalingTest.java:
>>>     extends FileInputFormat<String>
>>>
>>> FYI
>>>
>>> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> Thanks. Few more questions:
>>>>
>>>> - Is there an example for FileInputFormat?
>>>> - how to make it read all the files in a directory?
>>>> - how to make an inputformat a streaming input instead of batch? Eg:
>>>> read as new files come to a dir.
>>>>
>>>> Thanks again.
>>>>
>>>> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink calls the reachedEnd() method before it calls nextRecord() and
>>>>> closes the IF when reachedEnd() returns true.
>>>>> So, it should not return true until nextRecord() was called and the
>>>>> first and last record was emitted.
>>>>>
>>>>> You might also want to built your PDFFileInputFormat on
>>>>> FileInputFormat and set unsplittable to true.
>>>>> FileInputFormat comes with lots of built-in functionality such as
>>>>> InputSplit generation.
>>>>>
>>>>> Cheers, Fabian
>>>>>
>>>>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I created a custom input format. Idea behind this is to read all
>>>>>> binary files from a directory and use each file as it's own split. Each
>>>>>> split is read as one whole record. When I run it in flink I don't get any
>>>>>> error but I am not seeing any output from .print. Am I missing something?
>>>>>>
>>>>>> ----
>>>>>>
>>>>>> *public* *class* *PDFFileInputFormat* *extends*
>>>>>> RichInputFormat<StringValue, InputSplit> {
>>>>>>
>>>>>> *private* *static* *final* Logger *logger* = LoggerFactory.
>>>>>> *getLogger*(PDFFileInputFormat.*class*.getName());
>>>>>>
>>>>>> PDFFileInputSplit current = *null*;
>>>>>>
>>>>>> *public* *static* *void* main(String... args) *throws* Exception {
>>>>>>
>>>>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\
>>>>>> test");
>>>>>>
>>>>>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>>>>>
>>>>>> pdfReader.open(splits[0]);
>>>>>>
>>>>>> pdfReader.nextRecord(*null*);
>>>>>>
>>>>>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>>>>>> *getExecutionEnvironment*();
>>>>>>
>>>>>> env.fromElements(1, 2, 3)
>>>>>>
>>>>>> // returns the squared i
>>>>>>
>>>>>> .print();
>>>>>>
>>>>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"
>>>>>> );
>>>>>>
>>>>>> InputFormatSourceFunction<StringValue> *reader* = *new*
>>>>>> InputFormatSourceFunction<>(format,
>>>>>>
>>>>>> TypeInformation.*of*(StringValue.*class*));
>>>>>>
>>>>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>>>>>> ).print();
>>>>>>
>>>>>> }
>>>>>>
>>>>>> String path = *null*;
>>>>>>
>>>>>> *public* PDFFileInputFormat(String path) {
>>>>>>
>>>>>> *this*.path = path;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* *void* configure(Configuration parameters) {
>>>>>>
>>>>>> // *TODO* Auto-generated method stub
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>>>>>> *throws* IOException {
>>>>>>
>>>>>> // *TODO* Auto-generated method stub
>>>>>>
>>>>>> *return* cachedStatistics;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>>>>>> IOException {
>>>>>>
>>>>>> *final* List<PDFFileInputSplit> splits = *new*
>>>>>> ArrayList<PDFFileInputSplit>();
>>>>>>
>>>>>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>>>>>
>>>>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>>>>>
>>>>>> splits.add(split);
>>>>>>
>>>>>> });
>>>>>>
>>>>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
>>>>>> .size()];
>>>>>>
>>>>>> *return* splits.toArray(inputSplitArray);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[]
>>>>>> inputSplits) {
>>>>>>
>>>>>> *logger*.info("Assigner");
>>>>>>
>>>>>> // *TODO* Auto-generated method stub
>>>>>>
>>>>>> *return* *new* DefaultInputSplitAssigner(inputSplits);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* *void* open(InputSplit split) *throws* IOException {
>>>>>>
>>>>>> *this*.current = (PDFFileInputSplit) split;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* *boolean* reachedEnd() *throws* IOException {
>>>>>>
>>>>>> // *TODO* Auto-generated method stub
>>>>>>
>>>>>> *return* *true*;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* StringValue nextRecord(StringValue reuse) *throws*
>>>>>> IOException {
>>>>>>
>>>>>> String content = *new* String(Files.*readAllBytes*(*this*.current
>>>>>> .getFile()));
>>>>>>
>>>>>> *logger*.info("Content " + content);
>>>>>>
>>>>>> *return* *new* StringValue(content);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *public* *void* close() *throws* IOException {
>>>>>>
>>>>>> // *TODO* Auto-generated method stub
>>>>>>
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> ---
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mohit
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>