You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 김동원 <ea...@gmail.com> on 2017/03/14 14:47:32 UTC

Proper way to call a Python function in WindowFunction.apply()

Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0 <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
- Spark DStream API does not look promising to this case due to the lack of support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    // ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.socketTextStream("localhost", 9999)
      .countWindowAll(5, 1)
      .apply(new WindowFunction())
      .print()
    env.execute()
  }
}
--------

Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
	at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
	at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
	at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
	at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
	at main$.main(main.scala:23)
	at main.main(main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
	at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
	... 11 more
--------

Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon

Re: Proper way to call a Python function in WindowFunction.apply()

Posted by 김동원 <ea...@gmail.com>.
Alright, it works perfectly.
I checked that my Python methods are properly executed inside RichWindowFunction.
Thanks a lot!

p.s. for those who wonder why I use Jep, refer to https://sushant-hiray.me/posts/python-in-scala-stack/ <https://sushant-hiray.me/posts/python-in-scala-stack/> to grasp the idea of using Python inside Java through Jep instead of Jython and JyNI.


------------
class WindowFunction extends RichAllWindowFunction[String, String, GlobalWindow] {
  var jep: Option[Jep] = None

  override def open(parameters: Configuration): Unit = {
    jep = Some(new Jep())
    jep map (_.runScript("prediction.py"))
  }

  override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
    ...
  }

> 2017. 3. 15. 오전 1:27, Chesnay Schepler <ch...@apache.org> 작성:
> 
> Hey,
> 
> Naturally this would imply that you're script is available on all nodes, so you will have to distribute it manually.
> 
> On 14.03.2017 17:23, Chesnay Schepler wrote:
>> Hello,
>> 
>> I would suggest implementing the RichWindowFunction instead, and instantiate Jep within open(), or maybe do some lazy instantiation within apply.
>> 
>> Regards,
>> Chesnay
>> 
>> On 14.03.2017 15:47, 김동원 wrote:
>>> Hi all,
>>> 
>>> What is the proper way to call a Python function in WindowFunction.apply()?
>>> 
>>> I want to apply a Python function to values in a fixed-side sliding window.
>>> I'm trying it because
>>> - I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
>>> - I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
>>> - There is no Python DataStream API, so I tried to use Scala DataStream API.
>>> - PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0 <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
>>> - Spark DStream API does not look promising to this case due to the lack of support in count window.
>>> 
>>> For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
>>> --------
>>> import jep.Jep
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.util.Collector
>>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>> 
>>> class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
>>>   val jep = new Jep()
>>>   jep.runScript("prediction.py")
>>> 
>>>   override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
>>>     // ...
>>>   }
>>> }
>>> 
>>> object main {
>>>   def main(args: Array[String]): Unit = {
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.socketTextStream("localhost", 9999)
>>>       .countWindowAll(5, 1)
>>>       .apply(new WindowFunction())
>>>       .print()
>>>     env.execute()
>>>   }
>>> }
>>> --------
>>> 
>>> Now I'm facing with serializable error with the following error messages:
>>> --------
>>> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> 	at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>>> 	at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>>> 	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>>> 	at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>>> 	at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>>> 	at main$.main(main.scala:23)
>>> 	at main.main(main.scala)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 	at java.lang.reflect.Method.invoke(Method.java:497)
>>> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: java.io.NotSerializableException: jep.Jep
>>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> 	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
>>> 	at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>>> 	... 11 more
>>> --------
>>> 
>>> Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
>>> Do I have to make the third party library serializable? 
>>> Or there's a way to figure out this sort of thing in a totally different way in Flink?
>>> 
>>> Any help (even other frameworks than Flink) will be appreciated :-)
>>> Thanks you.
>>> 
>>> - Dongwon
>> 
> 


Re: Proper way to call a Python function in WindowFunction.apply()

Posted by Chesnay Schepler <ch...@apache.org>.
Hey,

Naturally this would imply that you're script is available on all nodes, 
so you will have to distribute it manually.

On 14.03.2017 17:23, Chesnay Schepler wrote:
> Hello,
>
> I would suggest implementing the RichWindowFunction instead, and 
> instantiate Jep within open(), or maybe do some lazy instantiation 
> within apply.
>
> Regards,
> Chesnay
>
> On 14.03.2017 15:47, \uae40\ub3d9\uc6d0 wrote:
>> Hi all,
>>
>> What is the proper way to call a Python function in 
>> WindowFunction.apply()?
>>
>> I want to apply a Python function to values in a fixed-side sliding 
>> window.
>> I'm trying it because
>> - I'm currently working on time-series prediction using deep 
>> learning, which is why I need a sliding window to get the latest N 
>> items from the unbound data stream.
>> - I already have a DNN written using Keras on top of Theano (Keras 
>> and Theano are Python libraries) in order to exploit Nvidia's CUDA 
>> library .
>> - There is no Python DataStream API, so I tried to use Scala 
>> DataStream API.
>> - PySpark's structured streaming does not allow me to define UDAF 
>> (see a question I posted on stackoverflow about it: 
>> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
>> - Spark DStream API does not look promising to this case due to the 
>> lack of support in count window.
>>
>> For these reasons, I thoughtlessly wrote a toy example to see the 
>> feasibility of applying Python methods to values in the sliding window.
>> --------
>> import jep.Jep
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.util.Collector
>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>
>> class WindowFunction extends AllWindowFunction[String, String, 
>> GlobalWindow] {
>>   val jep = new Jep()
>>   jep.runScript("prediction.py")
>>
>>   override def apply(window: GlobalWindow, iter: Iterable[String], 
>> out: Collector[String]): Unit = {
>>     // ...
>>   }
>> }
>>
>> object main {
>>   def main(args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.socketTextStream("localhost", 9999)
>>       .countWindowAll(5, 1)
>>       .apply(new WindowFunction())
>>       .print()
>>     env.execute()
>>   }
>> }
>> --------
>>
>> Now I'm facing with serializable error with the following error messages:
>> --------
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException: Task not 
>> serializable
>> at 
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>> at 
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>> at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>> at 
>> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>> at 
>> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>> at main$.main(main.scala:23)
>> at main.main(main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>> Caused by: java.io.NotSerializableException: jep.Jep
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at 
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at 
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
>> at 
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>> ... 11 more
>> --------
>>
>> Apparently, the source of problem is the third party library called 
>> Jep which helps call Python scripts.
>> Do I have to make the third party library serializable?
>> Or there's a way to figure out this sort of thing in a totally 
>> different way in Flink?
>>
>> Any help (even other frameworks than Flink) will be appreciated :-)
>> Thanks you.
>>
>> - Dongwon
>


Re: Proper way to call a Python function in WindowFunction.apply()

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

I would suggest implementing the RichWindowFunction instead, and 
instantiate Jep within open(), or maybe do some lazy instantiation 
within apply.

Regards,
Chesnay

On 14.03.2017 15:47, \uae40\ub3d9\uc6d0 wrote:
> Hi all,
>
> What is the proper way to call a Python function in 
> WindowFunction.apply()?
>
> I want to apply a Python function to values in a fixed-side sliding 
> window.
> I'm trying it because
> - I'm currently working on time-series prediction using deep learning, 
> which is why I need a sliding window to get the latest N items from 
> the unbound data stream.
> - I already have a DNN written using Keras on top of Theano (Keras and 
> Theano are Python libraries) in order to exploit Nvidia's CUDA library .
> - There is no Python DataStream API, so I tried to use Scala 
> DataStream API.
> - PySpark's structured streaming does not allow me to define UDAF (see 
> a question I posted on stackoverflow about it: 
> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
> - Spark DStream API does not look promising to this case due to the 
> lack of support in count window.
>
> For these reasons, I thoughtlessly wrote a toy example to see the 
> feasibility of applying Python methods to values in the sliding window.
> --------
> import jep.Jep
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>
> class WindowFunction extends AllWindowFunction[String, String, 
> GlobalWindow] {
>   val jep = new Jep()
>   jep.runScript("prediction.py")
>
>   override def apply(window: GlobalWindow, iter: Iterable[String], 
> out: Collector[String]): Unit = {
>     // ...
>   }
> }
>
> object main {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.socketTextStream("localhost", 9999)
>       .countWindowAll(5, 1)
>       .apply(new WindowFunction())
>       .print()
>     env.execute()
>   }
> }
> --------
>
> Now I'm facing with serializable error with the following error messages:
> --------
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
> at 
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
> at 
> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
> at 
> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
> at main$.main(main.scala:23)
> at main.main(main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: java.io.NotSerializableException: jep.Jep
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
> at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
> ... 11 more
> --------
>
> Apparently, the source of problem is the third party library called 
> Jep which helps call Python scripts.
> Do I have to make the third party library serializable?
> Or there's a way to figure out this sort of thing in a totally 
> different way in Flink?
>
> Any help (even other frameworks than Flink) will be appreciated :-)
> Thanks you.
>
> - Dongwon