You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Francis Conroy <fr...@switchdin.com> on 2022/01/07 05:00:24 UTC

pyflink mixed with Java operators

Hi all,

Does anyone know if it's possible to specify a java map function at some
intermediate point in a pyflink job? In this case

SimpleCountMeasurementsPerUUID

is a flink java MapFunction. The reason we want to do this is that
performance in pyflink seems quite poor.
e.g.

import logging
import os
import sys
import zlib

import Measurements_pb2
from pyflink.common import Types
from pyflink.common.serialization import
KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment,
RuntimeExecutionMode, MapFunction, RuntimeContext, \
    CheckpointingMode
from pyflink.datastream.connectors import RMQConnectionConfig,
RMQSource, KafkaSink

from functions.common import KeyByUUID
from functions.file_lister import auto_load_python_files
from customisations.serialisation import ZlibDeserializationSchema


class ZlibDecompressor(MapFunction):
    def map(self, value):
        decomp = zlib.decompress(value[1])
        return value[0], decomp


class MeasurementSnapshotCountMapFunction(MapFunction):
    def map(self, value):
        pb_body = Measurements_pb2.MeasurementSnapshot()
        pb_body.ParseFromString(value)
        meas_count = len(pb_body.measurements)
        if meas_count > 0:
            first_measurement = pb_body.measurements[0]
            point_uuid = first_measurement.point_uuid.value
            timestamp = first_measurement.time

            return timestamp, point_uuid, meas_count

        return None


def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
    env.add_jars(jarpath)
    auto_load_python_files(env)
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)
    env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)

    connection_config = RMQConnectionConfig.Builder() \
        .set_host("rabbitmq") \
        .set_port(5672) \
        .set_virtual_host("/") \
        .set_user_name("guest") \
        .set_password("guest") \
        .set_connection_timeout(60) \
        .set_prefetch_count(5000) \
        .build()

    deserialization_schema = ZlibDeserializationSchema()

    stream = env.add_source(RMQSource(
        connection_config,
        "flink-test",
        False,
        deserialization_schema,
    )).set_parallelism(1)

    # compute word count
    dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData")
\
        .key_by(KeyByUUID(), key_type=Types.STRING()) \
        .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")
 # Hypothetical

    kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
        .set_value_serialization_schema(SimpleStringSchema()) \
        .set_topic("flink-test-kafka") \
        .build()

    dstream.sink_to(
        KafkaSink.builder() \
            .set_record_serializer(kafka_serialisation_schema) \
            .set_bootstrap_servers("kafka:9092") \
            .build()
    )

    # submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
    word_count()

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: pyflink mixed with Java operators

Posted by Francis Conroy <fr...@switchdin.com>.
Thanks for this Dian. I'll give that a try.

On Mon, 10 Jan 2022 at 22:51, Dian Fu <di...@gmail.com> wrote:

> Hi,
>
> You could try the following method:
>
> ```
> from pyflink.java_gateway import get_gateway
>
> jvm = get_gateway().jvm
> ds = (
>     DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
> )
> ```
>
> Regards,
> Dian
>
> On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <
> francis.conroy@switchdin.com> wrote:
>
>> Hi all,
>>
>> Does anyone know if it's possible to specify a java map function at some
>> intermediate point in a pyflink job? In this case
>>
>> SimpleCountMeasurementsPerUUID
>>
>> is a flink java MapFunction. The reason we want to do this is that
>> performance in pyflink seems quite poor.
>> e.g.
>>
>> import logging
>> import os
>> import sys
>> import zlib
>>
>> import Measurements_pb2
>> from pyflink.common import Types
>> from pyflink.common.serialization import KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
>> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, MapFunction, RuntimeContext, \
>>     CheckpointingMode
>> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, KafkaSink
>>
>> from functions.common import KeyByUUID
>> from functions.file_lister import auto_load_python_files
>> from customisations.serialisation import ZlibDeserializationSchema
>>
>>
>> class ZlibDecompressor(MapFunction):
>>     def map(self, value):
>>         decomp = zlib.decompress(value[1])
>>         return value[0], decomp
>>
>>
>> class MeasurementSnapshotCountMapFunction(MapFunction):
>>     def map(self, value):
>>         pb_body = Measurements_pb2.MeasurementSnapshot()
>>         pb_body.ParseFromString(value)
>>         meas_count = len(pb_body.measurements)
>>         if meas_count > 0:
>>             first_measurement = pb_body.measurements[0]
>>             point_uuid = first_measurement.point_uuid.value
>>             timestamp = first_measurement.time
>>
>>             return timestamp, point_uuid, meas_count
>>
>>         return None
>>
>>
>> def word_count():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
>>     env.add_jars(jarpath)
>>     auto_load_python_files(env)
>>     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>>     # write all the data to one file
>>     env.set_parallelism(1)
>>     env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>>
>>     connection_config = RMQConnectionConfig.Builder() \
>>         .set_host("rabbitmq") \
>>         .set_port(5672) \
>>         .set_virtual_host("/") \
>>         .set_user_name("guest") \
>>         .set_password("guest") \
>>         .set_connection_timeout(60) \
>>         .set_prefetch_count(5000) \
>>         .build()
>>
>>     deserialization_schema = ZlibDeserializationSchema()
>>
>>     stream = env.add_source(RMQSource(
>>         connection_config,
>>         "flink-test",
>>         False,
>>         deserialization_schema,
>>     )).set_parallelism(1)
>>
>>     # compute word count
>>     dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
>>         .key_by(KeyByUUID(), key_type=Types.STRING()) \
>>         .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")  # Hypothetical
>>
>>     kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
>>         .set_value_serialization_schema(SimpleStringSchema()) \
>>         .set_topic("flink-test-kafka") \
>>         .build()
>>
>>     dstream.sink_to(
>>         KafkaSink.builder() \
>>             .set_record_serializer(kafka_serialisation_schema) \
>>             .set_bootstrap_servers("kafka:9092") \
>>             .build()
>>     )
>>
>>     # submit for execution
>>     env.execute()
>>
>>
>> if __name__ == '__main__':
>>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>>     word_count()
>>
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: pyflink mixed with Java operators

Posted by Dian Fu <di...@gmail.com>.
Hi,

You could try the following method:

```
from pyflink.java_gateway import get_gateway

jvm = get_gateway().jvm
ds = (
    DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
)
```

Regards,
Dian

On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <fr...@switchdin.com>
wrote:

> Hi all,
>
> Does anyone know if it's possible to specify a java map function at some
> intermediate point in a pyflink job? In this case
>
> SimpleCountMeasurementsPerUUID
>
> is a flink java MapFunction. The reason we want to do this is that
> performance in pyflink seems quite poor.
> e.g.
>
> import logging
> import os
> import sys
> import zlib
>
> import Measurements_pb2
> from pyflink.common import Types
> from pyflink.common.serialization import KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
> from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, MapFunction, RuntimeContext, \
>     CheckpointingMode
> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, KafkaSink
>
> from functions.common import KeyByUUID
> from functions.file_lister import auto_load_python_files
> from customisations.serialisation import ZlibDeserializationSchema
>
>
> class ZlibDecompressor(MapFunction):
>     def map(self, value):
>         decomp = zlib.decompress(value[1])
>         return value[0], decomp
>
>
> class MeasurementSnapshotCountMapFunction(MapFunction):
>     def map(self, value):
>         pb_body = Measurements_pb2.MeasurementSnapshot()
>         pb_body.ParseFromString(value)
>         meas_count = len(pb_body.measurements)
>         if meas_count > 0:
>             first_measurement = pb_body.measurements[0]
>             point_uuid = first_measurement.point_uuid.value
>             timestamp = first_measurement.time
>
>             return timestamp, point_uuid, meas_count
>
>         return None
>
>
> def word_count():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     jarpath = f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
>     env.add_jars(jarpath)
>     auto_load_python_files(env)
>     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>     # write all the data to one file
>     env.set_parallelism(1)
>     env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>
>     connection_config = RMQConnectionConfig.Builder() \
>         .set_host("rabbitmq") \
>         .set_port(5672) \
>         .set_virtual_host("/") \
>         .set_user_name("guest") \
>         .set_password("guest") \
>         .set_connection_timeout(60) \
>         .set_prefetch_count(5000) \
>         .build()
>
>     deserialization_schema = ZlibDeserializationSchema()
>
>     stream = env.add_source(RMQSource(
>         connection_config,
>         "flink-test",
>         False,
>         deserialization_schema,
>     )).set_parallelism(1)
>
>     # compute word count
>     dstream = stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
>         .key_by(KeyByUUID(), key_type=Types.STRING()) \
>         .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")  # Hypothetical
>
>     kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
>         .set_value_serialization_schema(SimpleStringSchema()) \
>         .set_topic("flink-test-kafka") \
>         .build()
>
>     dstream.sink_to(
>         KafkaSink.builder() \
>             .set_record_serializer(kafka_serialisation_schema) \
>             .set_bootstrap_servers("kafka:9092") \
>             .build()
>     )
>
>     # submit for execution
>     env.execute()
>
>
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>     word_count()
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>