You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mohit Anchlia <mo...@gmail.com> on 2017/08/01 00:09:33 UTC

Odd flink behaviour

I have a very simple program that just reads all the files in the path.
However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though
there are more in that directory. On closer look it appears that it might
be related to:

[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task
slot(s).

My question is, isn't flink supposed to iterate over the directory after
those 2 slots become free again? I am assuming this problem is caused
because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();

Re: Odd flink behaviour

Posted by Mohit Anchlia <mo...@gmail.com>.
Thanks. I thought the purpose of below method was to supply that
information?

@Override

*public* *boolean* reachedEnd() *throws* IOException {

*logger*.info("Reached " + reached);

*return* reached;

}

On Wed, Aug 2, 2017 at 1:43 AM, Fabian Hueske <fh...@gmail.com> wrote:

> FileInputFormat cannot know about the reached variable that you added in
> your class. So there is no way it could reset it to false.
> An alternative implementation without overriding open() could be to change
> the reachedEnd method to check if the stream is still at offset 0.
>
> 2017-08-01 20:22 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:
>
>> Thanks that worked. However, what I don't understand is wouldn't the open
>> call that I am inheriting have this logic already inbuilt? I am inheriting
>> FileInputFormat.
>>
>> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> An InputFormat processes multiple InputSplits. open() is called for each
>>> InputSplit.
>>> If you don't reset reached to false in open() you will only read a
>>> single (i.e., the first) InputSplit and skip all others.
>>>
>>> I'd override open as follows:
>>>
>>> public void open(FileInputSplit fileSplit) throws IOException {
>>>   super.open();
>>>   reached = false;
>>> }
>>>
>>> Cheers, Fabian
>>>
>>>
>>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:
>>>
>>>> I didn't override open. I am using open that got inherited from
>>>> FileInputFormat . Am I supposed to specifically override open?
>>>>
>>>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Do you set reached to false in open()?
>>>>>
>>>>>
>>>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <
>>>>> mohitanchlia@gmail.com>:
>>>>>
>>>>> And here is the inputformat code:
>>>>>
>>>>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>>>>  /**
>>>>>   *
>>>>>   */
>>>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>>>  private static final Logger logger = LoggerFactory
>>>>>    .getLogger(PDFInputFormat.class.getName());
>>>>>  private boolean reached = false;
>>>>>  @Override
>>>>>  public boolean reachedEnd() throws IOException {
>>>>>   logger.info("called reached " + reached);
>>>>>   // TODO Auto-generated method stub
>>>>>   return reached;
>>>>>  }
>>>>>  @Override
>>>>>  public String nextRecord(String reuse) throws IOException {
>>>>>   logger.info("This is where you parse PDF");
>>>>>   String content = new String(
>>>>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>>>> .getPath())));
>>>>>   logger.info("Content " + content);
>>>>>   reached = true;
>>>>>   return content;
>>>>>  }
>>>>> }
>>>>>
>>>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanchlia@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I have a very simple program that just reads all the files in the
>>>>>> path. However, flink is not working as expected.
>>>>>>
>>>>>> Everytime I execute this job I only see flink reading 2 files, even
>>>>>> though there are more in that directory. On closer look it appears that it
>>>>>> might be related to:
>>>>>>
>>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>>>>> task slot(s).
>>>>>>
>>>>>> My question is, isn't flink supposed to iterate over the directory
>>>>>> after those 2 slots become free again? I am assuming this problem is caused
>>>>>> because there are only 2 slots.
>>>>>>
>>>>>>
>>>>>> Code ---
>>>>>>
>>>>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>>>>   format.setFilePath(args[0]);
>>>>>>   format.setNestedFileEnumeration(true);
>>>>>>   logger.info("Number of splits " + format.getNumSplits());
>>>>>>
>>>>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>>>>> ring());
>>>>>>
>>>>>>   env.createInput(format, TypeInformation.of(StringValue
>>>>>> .class)).print();
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Odd flink behaviour

Posted by Fabian Hueske <fh...@gmail.com>.
FileInputFormat cannot know about the reached variable that you added in
your class. So there is no way it could reset it to false.
An alternative implementation without overriding open() could be to change
the reachedEnd method to check if the stream is still at offset 0.

2017-08-01 20:22 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:

> Thanks that worked. However, what I don't understand is wouldn't the open
> call that I am inheriting have this logic already inbuilt? I am inheriting
> FileInputFormat.
>
> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> An InputFormat processes multiple InputSplits. open() is called for each
>> InputSplit.
>> If you don't reset reached to false in open() you will only read a single
>> (i.e., the first) InputSplit and skip all others.
>>
>> I'd override open as follows:
>>
>> public void open(FileInputSplit fileSplit) throws IOException {
>>   super.open();
>>   reached = false;
>> }
>>
>> Cheers, Fabian
>>
>>
>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:
>>
>>> I didn't override open. I am using open that got inherited from
>>> FileInputFormat . Am I supposed to specifically override open?
>>>
>>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Do you set reached to false in open()?
>>>>
>>>>
>>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <
>>>> mohitanchlia@gmail.com>:
>>>>
>>>> And here is the inputformat code:
>>>>
>>>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>>>  /**
>>>>   *
>>>>   */
>>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>>  private static final Logger logger = LoggerFactory
>>>>    .getLogger(PDFInputFormat.class.getName());
>>>>  private boolean reached = false;
>>>>  @Override
>>>>  public boolean reachedEnd() throws IOException {
>>>>   logger.info("called reached " + reached);
>>>>   // TODO Auto-generated method stub
>>>>   return reached;
>>>>  }
>>>>  @Override
>>>>  public String nextRecord(String reuse) throws IOException {
>>>>   logger.info("This is where you parse PDF");
>>>>   String content = new String(
>>>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>>> .getPath())));
>>>>   logger.info("Content " + content);
>>>>   reached = true;
>>>>   return content;
>>>>  }
>>>> }
>>>>
>>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a very simple program that just reads all the files in the
>>>>> path. However, flink is not working as expected.
>>>>>
>>>>> Everytime I execute this job I only see flink reading 2 files, even
>>>>> though there are more in that directory. On closer look it appears that it
>>>>> might be related to:
>>>>>
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>>>> task slot(s).
>>>>>
>>>>> My question is, isn't flink supposed to iterate over the directory
>>>>> after those 2 slots become free again? I am assuming this problem is caused
>>>>> because there are only 2 slots.
>>>>>
>>>>>
>>>>> Code ---
>>>>>
>>>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>>>   format.setFilePath(args[0]);
>>>>>   format.setNestedFileEnumeration(true);
>>>>>   logger.info("Number of splits " + format.getNumSplits());
>>>>>
>>>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>>>> ring());
>>>>>
>>>>>   env.createInput(format, TypeInformation.of(StringValue
>>>>> .class)).print();
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Odd flink behaviour

Posted by Mohit Anchlia <mo...@gmail.com>.
Thanks that worked. However, what I don't understand is wouldn't the open
call that I am inheriting have this logic already inbuilt? I am inheriting
FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fh...@gmail.com> wrote:

> An InputFormat processes multiple InputSplits. open() is called for each
> InputSplit.
> If you don't reset reached to false in open() you will only read a single
> (i.e., the first) InputSplit and skip all others.
>
> I'd override open as follows:
>
> public void open(FileInputSplit fileSplit) throws IOException {
>   super.open();
>   reached = false;
> }
>
> Cheers, Fabian
>
>
> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:
>
>> I didn't override open. I am using open that got inherited from
>> FileInputFormat . Am I supposed to specifically override open?
>>
>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Do you set reached to false in open()?
>>>
>>>
>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanchlia@gmail.com
>>> >:
>>>
>>> And here is the inputformat code:
>>>
>>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>>  /**
>>>   *
>>>   */
>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>  private static final Logger logger = LoggerFactory
>>>    .getLogger(PDFInputFormat.class.getName());
>>>  private boolean reached = false;
>>>  @Override
>>>  public boolean reachedEnd() throws IOException {
>>>   logger.info("called reached " + reached);
>>>   // TODO Auto-generated method stub
>>>   return reached;
>>>  }
>>>  @Override
>>>  public String nextRecord(String reuse) throws IOException {
>>>   logger.info("This is where you parse PDF");
>>>   String content = new String(
>>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>> .getPath())));
>>>   logger.info("Content " + content);
>>>   reached = true;
>>>   return content;
>>>  }
>>> }
>>>
>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
>>> wrote:
>>>
>>>> I have a very simple program that just reads all the files in the path.
>>>> However, flink is not working as expected.
>>>>
>>>> Everytime I execute this job I only see flink reading 2 files, even
>>>> though there are more in that directory. On closer look it appears that it
>>>> might be related to:
>>>>
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>>> task slot(s).
>>>>
>>>> My question is, isn't flink supposed to iterate over the directory
>>>> after those 2 slots become free again? I am assuming this problem is caused
>>>> because there are only 2 slots.
>>>>
>>>>
>>>> Code ---
>>>>
>>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>>   format.setFilePath(args[0]);
>>>>   format.setNestedFileEnumeration(true);
>>>>   logger.info("Number of splits " + format.getNumSplits());
>>>>
>>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>>> ring());
>>>>
>>>>   env.createInput(format, TypeInformation.of(StringValue
>>>> .class)).print();
>>>>
>>>
>>>
>>>
>>
>

Re: Odd flink behaviour

Posted by Fabian Hueske <fh...@gmail.com>.
An InputFormat processes multiple InputSplits. open() is called for each
InputSplit.
If you don't reset reached to false in open() you will only read a single
(i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mo...@gmail.com>:

> I didn't override open. I am using open that got inherited from
> FileInputFormat . Am I supposed to specifically override open?
>
> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Do you set reached to false in open()?
>>
>>
>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanchlia@gmail.com
>> >:
>>
>> And here is the inputformat code:
>>
>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>  /**
>>   *
>>   */
>>  private static final long serialVersionUID = -4137283038479003711L;
>>  private static final Logger logger = LoggerFactory
>>    .getLogger(PDFInputFormat.class.getName());
>>  private boolean reached = false;
>>  @Override
>>  public boolean reachedEnd() throws IOException {
>>   logger.info("called reached " + reached);
>>   // TODO Auto-generated method stub
>>   return reached;
>>  }
>>  @Override
>>  public String nextRecord(String reuse) throws IOException {
>>   logger.info("This is where you parse PDF");
>>   String content = new String(
>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>> .getPath())));
>>   logger.info("Content " + content);
>>   reached = true;
>>   return content;
>>  }
>> }
>>
>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
>> wrote:
>>
>>> I have a very simple program that just reads all the files in the path.
>>> However, flink is not working as expected.
>>>
>>> Everytime I execute this job I only see flink reading 2 files, even
>>> though there are more in that directory. On closer look it appears that it
>>> might be related to:
>>>
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>> task slot(s).
>>>
>>> My question is, isn't flink supposed to iterate over the directory after
>>> those 2 slots become free again? I am assuming this problem is caused
>>> because there are only 2 slots.
>>>
>>>
>>> Code ---
>>>
>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>   format.setFilePath(args[0]);
>>>   format.setNestedFileEnumeration(true);
>>>   logger.info("Number of splits " + format.getNumSplits());
>>>
>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>> ring());
>>>
>>>   env.createInput(format, TypeInformation.of(StringValue
>>> .class)).print();
>>>
>>
>>
>>
>

Re: Odd flink behaviour

Posted by Mohit Anchlia <mo...@gmail.com>.
I didn't override open. I am using open that got inherited from
FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Do you set reached to false in open()?
>
>
> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mo...@gmail.com>:
>
> And here is the inputformat code:
>
> public class PDFFileInputFormat extends FileInputFormat<String> {
>  /**
>   *
>   */
>  private static final long serialVersionUID = -4137283038479003711L;
>  private static final Logger logger = LoggerFactory
>    .getLogger(PDFInputFormat.class.getName());
>  private boolean reached = false;
>  @Override
>  public boolean reachedEnd() throws IOException {
>   logger.info("called reached " + reached);
>   // TODO Auto-generated method stub
>   return reached;
>  }
>  @Override
>  public String nextRecord(String reuse) throws IOException {
>   logger.info("This is where you parse PDF");
>   String content = new String(
>     Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
>   logger.info("Content " + content);
>   reached = true;
>   return content;
>  }
> }
>
> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
> wrote:
>
>> I have a very simple program that just reads all the files in the path.
>> However, flink is not working as expected.
>>
>> Everytime I execute this job I only see flink reading 2 files, even
>> though there are more in that directory. On closer look it appears that it
>> might be related to:
>>
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>> task slot(s).
>>
>> My question is, isn't flink supposed to iterate over the directory after
>> those 2 slots become free again? I am assuming this problem is caused
>> because there are only 2 slots.
>>
>>
>> Code ---
>>
>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>   format.setFilePath(args[0]);
>>   format.setNestedFileEnumeration(true);
>>   logger.info("Number of splits " + format.getNumSplits());
>>
>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());
>>
>>   env.createInput(format, TypeInformation.of(StringValue.class)).print();
>>
>
>
>

Re: Odd flink behaviour

Posted by Fabian Hueske <fh...@gmail.com>.
Do you set reached to false in open()?

Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mo...@gmail.com>:

And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I have a very simple program that just reads all the files in the path.
> However, flink is not working as expected.
>
> Everytime I execute this job I only see flink reading 2 files, even though
> there are more in that directory. On closer look it appears that it might
> be related to:
>
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task
> slot(s).
>
> My question is, isn't flink supposed to iterate over the directory after
> those 2 slots become free again? I am assuming this problem is caused
> because there are only 2 slots.
>
>
> Code ---
>
>   PDFFileInputFormat format = new PDFFileInputFormat();
>   format.setFilePath(args[0]);
>   format.setNestedFileEnumeration(true);
>   logger.info("Number of splits " + format.getNumSplits());
>
>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());
>
>   env.createInput(format, TypeInformation.of(StringValue.class)).print();
>

Re: Odd flink behaviour

Posted by Mohit Anchlia <mo...@gmail.com>.
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mo...@gmail.com>
wrote:

> I have a very simple program that just reads all the files in the path.
> However, flink is not working as expected.
>
> Everytime I execute this job I only see flink reading 2 files, even though
> there are more in that directory. On closer look it appears that it might
> be related to:
>
> [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager
> - TaskManager has 2 task slot(s).
>
> My question is, isn't flink supposed to iterate over the directory after
> those 2 slots become free again? I am assuming this problem is caused
> because there are only 2 slots.
>
>
> Code ---
>
>   PDFFileInputFormat format = new PDFFileInputFormat();
>   format.setFilePath(args[0]);
>   format.setNestedFileEnumeration(true);
>   logger.info("Number of splits " + format.getNumSplits());
>
>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());
>
>   env.createInput(format, TypeInformation.of(StringValue.class)).print();
>