You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 김동원 <ea...@gmail.com> on 2017/03/14 14:47:32 UTC
Proper way to call a Python function in WindowFunction.apply()
Hi all,
What is the proper way to call a Python function in WindowFunction.apply()?
I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library .
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0 <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
- Spark DStream API does not look promising to this case due to the lack of support in count window.
For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
--------
import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
val jep = new Jep()
jep.runScript("prediction.py")
override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
// ...
}
}
object main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("localhost", 9999)
.countWindowAll(5, 1)
.apply(new WindowFunction())
.print()
env.execute()
}
}
--------
Now I'm facing with serializable error with the following error messages:
--------
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more
--------
Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
Do I have to make the third party library serializable?
Or there's a way to figure out this sort of thing in a totally different way in Flink?
Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.
- Dongwon
Re: Proper way to call a Python function in WindowFunction.apply()
Posted by 김동원 <ea...@gmail.com>.
Alright, it works perfectly.
I checked that my Python methods are properly executed inside RichWindowFunction.
Thanks a lot!
p.s. for those who wonder why I use Jep, refer to https://sushant-hiray.me/posts/python-in-scala-stack/ <https://sushant-hiray.me/posts/python-in-scala-stack/> to grasp the idea of using Python inside Java through Jep instead of Jython and JyNI.
------------
class WindowFunction extends RichAllWindowFunction[String, String, GlobalWindow] {
var jep: Option[Jep] = None
override def open(parameters: Configuration): Unit = {
jep = Some(new Jep())
jep map (_.runScript("prediction.py"))
}
override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
...
}
> 2017. 3. 15. 오전 1:27, Chesnay Schepler <ch...@apache.org> 작성:
>
> Hey,
>
> Naturally this would imply that you're script is available on all nodes, so you will have to distribute it manually.
>
> On 14.03.2017 17:23, Chesnay Schepler wrote:
>> Hello,
>>
>> I would suggest implementing the RichWindowFunction instead, and instantiate Jep within open(), or maybe do some lazy instantiation within apply.
>>
>> Regards,
>> Chesnay
>>
>> On 14.03.2017 15:47, 김동원 wrote:
>>> Hi all,
>>>
>>> What is the proper way to call a Python function in WindowFunction.apply()?
>>>
>>> I want to apply a Python function to values in a fixed-side sliding window.
>>> I'm trying it because
>>> - I'm currently working on time-series prediction using deep learning, which is why I need a sliding window to get the latest N items from the unbound data stream.
>>> - I already have a DNN written using Keras on top of Theano (Keras and Theano are Python libraries) in order to exploit Nvidia's CUDA library .
>>> - There is no Python DataStream API, so I tried to use Scala DataStream API.
>>> - PySpark's structured streaming does not allow me to define UDAF (see a question I posted on stackoverflow about it: http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0 <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
>>> - Spark DStream API does not look promising to this case due to the lack of support in count window.
>>>
>>> For these reasons, I thoughtlessly wrote a toy example to see the feasibility of applying Python methods to values in the sliding window.
>>> --------
>>> import jep.Jep
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.util.Collector
>>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>>
>>> class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
>>> val jep = new Jep()
>>> jep.runScript("prediction.py")
>>>
>>> override def apply(window: GlobalWindow, iter: Iterable[String], out: Collector[String]): Unit = {
>>> // ...
>>> }
>>> }
>>>
>>> object main {
>>> def main(args: Array[String]): Unit = {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.socketTextStream("localhost", 9999)
>>> .countWindowAll(5, 1)
>>> .apply(new WindowFunction())
>>> .print()
>>> env.execute()
>>> }
>>> }
>>> --------
>>>
>>> Now I'm facing with serializable error with the following error messages:
>>> --------
>>> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>>> at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>>> at org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>>> at org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>>> at main$.main(main.scala:23)
>>> at main.main(main.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: java.io.NotSerializableException: jep.Jep
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
>>> at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>>> ... 11 more
>>> --------
>>>
>>> Apparently, the source of problem is the third party library called Jep which helps call Python scripts.
>>> Do I have to make the third party library serializable?
>>> Or there's a way to figure out this sort of thing in a totally different way in Flink?
>>>
>>> Any help (even other frameworks than Flink) will be appreciated :-)
>>> Thanks you.
>>>
>>> - Dongwon
>>
>
Re: Proper way to call a Python function in WindowFunction.apply()
Posted by Chesnay Schepler <ch...@apache.org>.
Hey,
Naturally this would imply that you're script is available on all nodes,
so you will have to distribute it manually.
On 14.03.2017 17:23, Chesnay Schepler wrote:
> Hello,
>
> I would suggest implementing the RichWindowFunction instead, and
> instantiate Jep within open(), or maybe do some lazy instantiation
> within apply.
>
> Regards,
> Chesnay
>
> On 14.03.2017 15:47, \uae40\ub3d9\uc6d0 wrote:
>> Hi all,
>>
>> What is the proper way to call a Python function in
>> WindowFunction.apply()?
>>
>> I want to apply a Python function to values in a fixed-side sliding
>> window.
>> I'm trying it because
>> - I'm currently working on time-series prediction using deep
>> learning, which is why I need a sliding window to get the latest N
>> items from the unbound data stream.
>> - I already have a DNN written using Keras on top of Theano (Keras
>> and Theano are Python libraries) in order to exploit Nvidia's CUDA
>> library .
>> - There is no Python DataStream API, so I tried to use Scala
>> DataStream API.
>> - PySpark's structured streaming does not allow me to define UDAF
>> (see a question I posted on stackoverflow about it:
>> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
>> - Spark DStream API does not look promising to this case due to the
>> lack of support in count window.
>>
>> For these reasons, I thoughtlessly wrote a toy example to see the
>> feasibility of applying Python methods to values in the sliding window.
>> --------
>> import jep.Jep
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.util.Collector
>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>
>> class WindowFunction extends AllWindowFunction[String, String,
>> GlobalWindow] {
>> val jep = new Jep()
>> jep.runScript("prediction.py")
>>
>> override def apply(window: GlobalWindow, iter: Iterable[String],
>> out: Collector[String]): Unit = {
>> // ...
>> }
>> }
>>
>> object main {
>> def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.socketTextStream("localhost", 9999)
>> .countWindowAll(5, 1)
>> .apply(new WindowFunction())
>> .print()
>> env.execute()
>> }
>> }
>> --------
>>
>> Now I'm facing with serializable error with the following error messages:
>> --------
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>> at
>> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>> at
>> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>> at main$.main(main.scala:23)
>> at main.main(main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>> Caused by: java.io.NotSerializableException: jep.Jep
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>> ... 11 more
>> --------
>>
>> Apparently, the source of problem is the third party library called
>> Jep which helps call Python scripts.
>> Do I have to make the third party library serializable?
>> Or there's a way to figure out this sort of thing in a totally
>> different way in Flink?
>>
>> Any help (even other frameworks than Flink) will be appreciated :-)
>> Thanks you.
>>
>> - Dongwon
>
Re: Proper way to call a Python function in WindowFunction.apply()
Posted by Chesnay Schepler <ch...@apache.org>.
Hello,
I would suggest implementing the RichWindowFunction instead, and
instantiate Jep within open(), or maybe do some lazy instantiation
within apply.
Regards,
Chesnay
On 14.03.2017 15:47, \uae40\ub3d9\uc6d0 wrote:
> Hi all,
>
> What is the proper way to call a Python function in
> WindowFunction.apply()?
>
> I want to apply a Python function to values in a fixed-side sliding
> window.
> I'm trying it because
> - I'm currently working on time-series prediction using deep learning,
> which is why I need a sliding window to get the latest N items from
> the unbound data stream.
> - I already have a DNN written using Keras on top of Theano (Keras and
> Theano are Python libraries) in order to exploit Nvidia's CUDA library .
> - There is no Python DataStream API, so I tried to use Scala
> DataStream API.
> - PySpark's structured streaming does not allow me to define UDAF (see
> a question I posted on stackoverflow about it:
> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0)
> - Spark DStream API does not look promising to this case due to the
> lack of support in count window.
>
> For these reasons, I thoughtlessly wrote a toy example to see the
> feasibility of applying Python methods to values in the sliding window.
> --------
> import jep.Jep
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>
> class WindowFunction extends AllWindowFunction[String, String,
> GlobalWindow] {
> val jep = new Jep()
> jep.runScript("prediction.py")
>
> override def apply(window: GlobalWindow, iter: Iterable[String],
> out: Collector[String]): Unit = {
> // ...
> }
> }
>
> object main {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.socketTextStream("localhost", 9999)
> .countWindowAll(5, 1)
> .apply(new WindowFunction())
> .print()
> env.execute()
> }
> }
> --------
>
> Now I'm facing with serializable error with the following error messages:
> --------
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
> at
> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
> at
> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
> at main$.main(main.scala:23)
> at main.main(main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> Caused by: java.io.NotSerializableException: jep.Jep
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
> at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
> ... 11 more
> --------
>
> Apparently, the source of problem is the third party library called
> Jep which helps call Python scripts.
> Do I have to make the third party library serializable?
> Or there's a way to figure out this sort of thing in a totally
> different way in Flink?
>
> Any help (even other frameworks than Flink) will be appreciated :-)
> Thanks you.
>
> - Dongwon