You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shannon Quinn <sq...@gatech.edu> on 2016/03/24 21:20:14 UTC

Native iterations in PyFlink

Hi all,

I'm looking at Flink for highly iterative ALS-like distributed 
computations, and the concept of native iteration support was very 
attractive. However, I notice that the Python API is missing this item. 
I'd absolutely be interested in adding that component if someone could 
point me in the right direction. Thanks!

Shannon

Re: Native iterations in PyFlink

Posted by Chesnay Schepler <ch...@apache.org>.
Hello Shannon,

  you've picked yourself quite a feature there.

The following classes will be relevant:

  * Python
      o DataSet
      o OperationInfo
      o Environment (_send_operation method)
      o Constants._Identifier
  * Java
      o PythonPlanBinder
      o PythonOperationInfo

An (Python)OperationInfo is a generic container for all arguments that 
are required to define an operation.
OperationInfos store information about parent/children sets, and thus 
form a double-linked tree structure.
These objects are transferred 1:1 to Java (with the exception of a few 
internal fields).
The contained arguments are sent in Environment._send_operation, and 
received in the PythonOperationInfo constructor.

The DataSet class resembles the Java DataSet class. Every operation 
generates a OperationInfo; you'll mostly deal with API design, and how 
to store the relevant information inside a OperationInfo.

The PythonPlanBinder effectively iterates over all OperationInfos, and 
reconstructs the final Java program from them. Operations are
defined in the order they were defined in the Python API, and all 
created DataSets are stored in a Map of DataSetID -> DataSet.

For a batch iteration you will need to do the following:

  * add a iterate() method to the DataSet class
      o this method generates a new OperationInfo containing
        (_Identifier.ITERATION, ID of the dataset that it was applied
        on, iteration count)
      o should return an IterativeDataSet (new class)
  * a new IterativeDataSet class that extends the DataSet, offering a
    new closeWith methods
      o generate a new OperationInfo containing
        (_Identifier.ITERATION_CLOSE, ID of the dataSet it was applied
        on, ID's of the resultSet [ID of the terminationCriterion])
  * within PythonPlanBinder you'll have to add 2 new methods:
      o   createIterationOperation()
          + fetch DataSet to apply iterate on (sets.get(info.parentID))
          + apply dataSet.iterate(info.getIterationCount)
          + store resulting set in the map
      o   createIterationCloseOperation()
          + fetch the IterativeDataSet to apply closeWith on
          + fetch resultSet/terminationCriterion dataSet
              # you'll have to account for both closeWith(resultSet) and
                closeWith(resultSet, terminationCriteration)!
          + apply closeWith, store resulting set in the map

I never looked to deeply into iterations myself, as such I'm not sure if 
there will be issues at runtime. But for the API the above steps should 
point you in the right direction. Delta-iterations should follow a 
similar pattern.

Feel free to mail me directly if you need further help.

On 24.03.2016 21:20, Shannon Quinn wrote:
> Hi all,
>
> I'm looking at Flink for highly iterative ALS-like distributed 
> computations, and the concept of native iteration support was very 
> attractive. However, I notice that the Python API is missing this 
> item. I'd absolutely be interested in adding that component if someone 
> could point me in the right direction. Thanks!
>
> Shannon
>