You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Francis Conroy <fr...@switchdin.com> on 2022/01/07 05:00:24 UTC
pyflink mixed with Java operators
Hi all,
Does anyone know if it's possible to specify a java map function at some
intermediate point in a pyflink job? In this case
SimpleCountMeasurementsPerUUID
is a flink java MapFunction. The reason we want to do this is that
performance in pyflink seems quite poor.
e.g.
import logging
import os
import sys
import zlib
import Measurements_pb2
from pyflink.common import Types
from pyflink.common.serialization import
KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode, MapFunction, RuntimeContext, \
CheckpointingMode
from pyflink.datastream.connectors import RMQConnectionConfig,
RMQSource, KafkaSink
from functions.common import KeyByUUID
from functions.file_lister import auto_load_python_files
from customisations.serialisation import ZlibDeserializationSchema
class ZlibDecompressor(MapFunction):
def map(self, value):
decomp = zlib.decompress(value[1])
return value[0], decomp
class MeasurementSnapshotCountMapFunction(MapFunction):
def map(self, value):
pb_body = Measurements_pb2.MeasurementSnapshot()
pb_body.ParseFromString(value)
meas_count = len(pb_body.measurements)
if meas_count > 0:
first_measurement = pb_body.measurements[0]
point_uuid = first_measurement.point_uuid.value
timestamp = first_measurement.time
return timestamp, point_uuid, meas_count
return None
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
env.add_jars(jarpath)
auto_load_python_files(env)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
connection_config = RMQConnectionConfig.Builder() \
.set_host("rabbitmq") \
.set_port(5672) \
.set_virtual_host("/") \
.set_user_name("guest") \
.set_password("guest") \
.set_connection_timeout(60) \
.set_prefetch_count(5000) \
.build()
deserialization_schema = ZlibDeserializationSchema()
stream = env.add_source(RMQSource(
connection_config,
"flink-test",
False,
deserialization_schema,
)).set_parallelism(1)
# compute word count
dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData")
\
.key_by(KeyByUUID(), key_type=Types.STRING()) \
.jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")
# Hypothetical
kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
.set_value_serialization_schema(SimpleStringSchema()) \
.set_topic("flink-test-kafka") \
.build()
dstream.sink_to(
KafkaSink.builder() \
.set_record_serializer(kafka_serialisation_schema) \
.set_bootstrap_servers("kafka:9092") \
.build()
)
# submit for execution
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
word_count()
--
This email and any attachments are proprietary and confidential and are
intended solely for the use of the individual to whom it is addressed. Any
views or opinions expressed are solely those of the author and do not
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
received this email in error, please let us know immediately by reply email
and delete it from your system. You may not use, disseminate, distribute or
copy this message nor disclose its contents to anyone.
SwitchDin Pty Ltd
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia
Re: pyflink mixed with Java operators
Posted by Francis Conroy <fr...@switchdin.com>.
Thanks for this Dian. I'll give that a try.
On Mon, 10 Jan 2022 at 22:51, Dian Fu <di...@gmail.com> wrote:
> Hi,
>
> You could try the following method:
>
> ```
> from pyflink.java_gateway import get_gateway
>
> jvm = get_gateway().jvm
> ds = (
> DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
> )
> ```
>
> Regards,
> Dian
>
> On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <
> francis.conroy@switchdin.com> wrote:
>
>> Hi all,
>>
>> Does anyone know if it's possible to specify a java map function at some
>> intermediate point in a pyflink job? In this case
>>
>> SimpleCountMeasurementsPerUUID
>>
>> is a flink java MapFunction. The reason we want to do this is that
>> performance in pyflink seems quite poor.
>> e.g.
>>
>> import logging
>> import os
>> import sys
>> import zlib
>>
>> import Measurements_pb2
>> from pyflink.common import Types
>> from pyflink.common.serialization import KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
>> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, MapFunction, RuntimeContext, \
>> CheckpointingMode
>> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, KafkaSink
>>
>> from functions.common import KeyByUUID
>> from functions.file_lister import auto_load_python_files
>> from customisations.serialisation import ZlibDeserializationSchema
>>
>>
>> class ZlibDecompressor(MapFunction):
>> def map(self, value):
>> decomp = zlib.decompress(value[1])
>> return value[0], decomp
>>
>>
>> class MeasurementSnapshotCountMapFunction(MapFunction):
>> def map(self, value):
>> pb_body = Measurements_pb2.MeasurementSnapshot()
>> pb_body.ParseFromString(value)
>> meas_count = len(pb_body.measurements)
>> if meas_count > 0:
>> first_measurement = pb_body.measurements[0]
>> point_uuid = first_measurement.point_uuid.value
>> timestamp = first_measurement.time
>>
>> return timestamp, point_uuid, meas_count
>>
>> return None
>>
>>
>> def word_count():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
>> env.add_jars(jarpath)
>> auto_load_python_files(env)
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> # write all the data to one file
>> env.set_parallelism(1)
>> env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>>
>> connection_config = RMQConnectionConfig.Builder() \
>> .set_host("rabbitmq") \
>> .set_port(5672) \
>> .set_virtual_host("/") \
>> .set_user_name("guest") \
>> .set_password("guest") \
>> .set_connection_timeout(60) \
>> .set_prefetch_count(5000) \
>> .build()
>>
>> deserialization_schema = ZlibDeserializationSchema()
>>
>> stream = env.add_source(RMQSource(
>> connection_config,
>> "flink-test",
>> False,
>> deserialization_schema,
>> )).set_parallelism(1)
>>
>> # compute word count
>> dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
>> .key_by(KeyByUUID(), key_type=Types.STRING()) \
>> .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID") # Hypothetical
>>
>> kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
>> .set_value_serialization_schema(SimpleStringSchema()) \
>> .set_topic("flink-test-kafka") \
>> .build()
>>
>> dstream.sink_to(
>> KafkaSink.builder() \
>> .set_record_serializer(kafka_serialisation_schema) \
>> .set_bootstrap_servers("kafka:9092") \
>> .build()
>> )
>>
>> # submit for execution
>> env.execute()
>>
>>
>> if __name__ == '__main__':
>> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>> word_count()
>>
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>
--
This email and any attachments are proprietary and confidential and are
intended solely for the use of the individual to whom it is addressed. Any
views or opinions expressed are solely those of the author and do not
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
received this email in error, please let us know immediately by reply email
and delete it from your system. You may not use, disseminate, distribute or
copy this message nor disclose its contents to anyone.
SwitchDin Pty Ltd
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia
Re: pyflink mixed with Java operators
Posted by Dian Fu <di...@gmail.com>.
Hi,
You could try the following method:
```
from pyflink.java_gateway import get_gateway
jvm = get_gateway().jvm
ds = (
DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
)
```
Regards,
Dian
On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <fr...@switchdin.com>
wrote:
> Hi all,
>
> Does anyone know if it's possible to specify a java map function at some
> intermediate point in a pyflink job? In this case
>
> SimpleCountMeasurementsPerUUID
>
> is a flink java MapFunction. The reason we want to do this is that
> performance in pyflink seems quite poor.
> e.g.
>
> import logging
> import os
> import sys
> import zlib
>
> import Measurements_pb2
> from pyflink.common import Types
> from pyflink.common.serialization import KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, MapFunction, RuntimeContext, \
> CheckpointingMode
> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, KafkaSink
>
> from functions.common import KeyByUUID
> from functions.file_lister import auto_load_python_files
> from customisations.serialisation import ZlibDeserializationSchema
>
>
> class ZlibDecompressor(MapFunction):
> def map(self, value):
> decomp = zlib.decompress(value[1])
> return value[0], decomp
>
>
> class MeasurementSnapshotCountMapFunction(MapFunction):
> def map(self, value):
> pb_body = Measurements_pb2.MeasurementSnapshot()
> pb_body.ParseFromString(value)
> meas_count = len(pb_body.measurements)
> if meas_count > 0:
> first_measurement = pb_body.measurements[0]
> point_uuid = first_measurement.point_uuid.value
> timestamp = first_measurement.time
>
> return timestamp, point_uuid, meas_count
>
> return None
>
>
> def word_count():
> env = StreamExecutionEnvironment.get_execution_environment()
> jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
> env.add_jars(jarpath)
> auto_load_python_files(env)
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
> # write all the data to one file
> env.set_parallelism(1)
> env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>
> connection_config = RMQConnectionConfig.Builder() \
> .set_host("rabbitmq") \
> .set_port(5672) \
> .set_virtual_host("/") \
> .set_user_name("guest") \
> .set_password("guest") \
> .set_connection_timeout(60) \
> .set_prefetch_count(5000) \
> .build()
>
> deserialization_schema = ZlibDeserializationSchema()
>
> stream = env.add_source(RMQSource(
> connection_config,
> "flink-test",
> False,
> deserialization_schema,
> )).set_parallelism(1)
>
> # compute word count
> dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
> .key_by(KeyByUUID(), key_type=Types.STRING()) \
> .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID") # Hypothetical
>
> kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
> .set_value_serialization_schema(SimpleStringSchema()) \
> .set_topic("flink-test-kafka") \
> .build()
>
> dstream.sink_to(
> KafkaSink.builder() \
> .set_record_serializer(kafka_serialisation_schema) \
> .set_bootstrap_servers("kafka:9092") \
> .build()
> )
>
> # submit for execution
> env.execute()
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
> word_count()
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>