You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zixuan Rao (Jira)" <ji...@apache.org> on 2021/01/29 21:35:00 UTC

[jira] [Updated] (FLINK-21211) Looking for reviews on a framework based on flink-statefun

     [ https://issues.apache.org/jira/browse/FLINK-21211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zixuan Rao updated FLINK-21211:
-------------------------------
    Description: 
Hi, I am currently developing a framework targeting back end state management. To ensure exactly-once processing of events in back end, I intend to use Flink Stateful Functions runtime in combination with Python's asyncio. I hope to receive some feedbacks. 

The following code shows an example (draft) of writing a back end micro service using my framework. It is intended to be equivalent (exchangeable) with Flink-stateful examples/ridesharing. The idea is that "Event" is reducible to an async function call, and external egress can be emitted by saving an object. This preserves the exactly-once features of Flink-statefun while adding a great deal of readability to the code. 

Reviews are appreciated. Thank you! 
{code}
"""
Equivalent implementation for flink stateful functions example - ridesharing
ref: https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java
"""
from onto.models.base import Serializable

"""
Rewrite callback-style code to async-await: 
ref: https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript 
"""

from onto.domain_model import DomainModel
from onto.attrs import attrs


class RideshareBase(DomainModel):
    pass


class Passenger(RideshareBase):

    async def request_ride(self, start_geo_cell, end_geo_cell):
        r = Ride.create()  # TODO: implement create
        await r.passenger_joins(
            passenger=self,
            start_geo_cell=start_geo_cell,
            end_geo_cell=end_geo_cell
        )

    class PassengerMessage(DomainModel):
        passenger = attrs.relation('Passenger')

        class RideFailedMessage(Serializable):
            ride = attrs.relation('Ride')

        ride_failed = attrs.embed(RideFailedMessage).optional

        class DriverHasBeenFoundMessage(Serializable):
            driver = attrs.relation('Driver')
            driver_geo_cell = attrs.relation('GeoCell')

        driver_found = attrs.embed(RideFailedMessage).optional

        class RideHasStarted(Serializable):
            driver = attrs.relation('Driver')

        ride_started = attrs.embed(RideHasStarted).optional

        class RideHasEnded(Serializable):
            pass  # TODO: make sure that empty class works

        ride_ended = attrs.embed(RideHasEnded).optional

    async def ride_failed(self, ride: 'Ride'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_failed=self.PassengerMessage.RideFailedMessage.new(
                ride=ride
            )
        )
        message.save()

    async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 'GeoCell'):
        message = self.PassengerMessage.new(
            passenger=self,
            driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new(
                driver=driver,
                driver_geo_cell=driver_geo_cell
            )
        )
        message.save()

    async def ride_started(self, driver: 'Driver'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasStarted.new(
                driver=driver
            )
        )
        message.save()

    async def ride_ended(self):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasEnded.new()
        )
        message.save()


class DriverRejectsPickupError(RideshareBase, Exception):
    driver = attrs.relation(dm_cls='Driver')
    ride = attrs.relation(dm_cls='Ride')


class Driver(RideshareBase):
    is_taken: bool = attrs.required
    current_ride = attrs.relation(dm_cls='Ride').optional
    current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell')

    @is_taken.getter
    def is_taken(self):
        # TODO: make better
        return self.current_ride is not None

    async def pickup_passenger(self, ride: 'Ride', passenger: Passenger,
                passenger_start_cell: 'GeoCell',
                passenger_end_cell: 'GeoCell'):
        if self.is_taken:
            raise DriverRejectsPickupError(driver=self, ride=ride)
        self.current_ride = ride

        # "    // We also need to unregister ourselves from the current geo cell we belong to."
        if geo_cell := self.current_location:
            await geo_cell.leave_cell(driver=self)

        await ride.driver_joins(driver=self, driver_location=self.current_location)

        message = self.DriverMessage.new(
            driver=self,
            pickup_passenger=self.DriverMessage.PickupPassengerMessage.new(
                passenger=passenger,
                start_geo_location=passenger_start_cell,
                end_geo_location=passenger_end_cell
            )
        )
        message.save()

    class DriverMessage(RideshareBase):
        driver = attrs.relation('Driver')

        class PickupPassengerMessage(Serializable):
            ride = attrs.relation('Ride')  # TODO: maybe passenger_id
            start_geo_location = attrs.relation('GeoCell')
            end_geo_location = attrs.relation('GeoCell')

        pickup_passenger = attrs.embed(PickupPassengerMessage)

    async def ride_has_started(self):
        await self.current_ride.ride_started(driver=self, driver_geo_cell=self.current_location)

    async def ride_has_ended(self):
        await self.current_ride.ride_ended()

    async def location_is_updated(self, current_geo_cell: 'GeoCell'):
        # TODO: maybe switch to embed:     final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell();
        updated = current_geo_cell
        last = self.current_location
        if last is None:
            self.current_location = updated
            await updated.join_cell()
            return
        elif last == updated:
            return
        else:
            self.current_location = updated


class Ride(RideshareBase):
    passenger = attrs.relation(dm_cls=Passenger)
    driver = attrs.relation(dm_cls=Driver)

    async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'):
        await self.passenger.ride_started(driver=driver)

    async def passenger_joins(
            self,
            passenger: Passenger,
            start_geo_cell: 'GeoCell',
            end_geo_cell: 'GeoCell'
    ):
        self.passenger = passenger
        MAX_RETRY = 5
        # Ref: https://stackoverflow.com/a/7663441
        for trial in range(MAX_RETRY):
            if driver := start_geo_cell.get_driver():
                try:
                    await driver.pickup_passenger(
                        ride=self,
                        passenger=passenger,
                        passenger_start_cell=start_geo_cell,
                        passenger_end_cell=end_geo_cell
                    )
                except DriverRejectsPickupError as _:
                    # TODO: NOTE difference from java impl
                    """
                    final int startGeoCell = passenger.get().getStartGeoCell();
                    String cellKey = String.valueOf(startGeoCell);
                    context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
                    """
                    continue  # to retry
                else:
                    break
        else:
            await passenger.ride_failed(ride=self)

    async def driver_joins(self, driver, driver_location):
        self.driver = driver
        await self.passenger.driver_joins_ride(driver=driver, driver_geo_cell=driver_location)

    async def ride_ended(self, ):
        await self.passenger.ride_ended()
        self.passenger = None
        self.driver = None


class GeoCell(RideshareBase):

    drivers: list = attrs.list(
        value=attrs.relation(dm_cls=Driver)
    )

    async def get_driver(self) -> Driver:
        if len(self.drivers) != 0:
            next_driver = self.drivers[0]
            return next_driver
        else:
            return None

    async def leave_cell(self, driver: Driver):
        self.drivers.remove(driver)

    async def add_driver(self):
        # TODO: mutated local variable vs mutated instance state;
        #  may cause difference in behavior
        if self.drivers is None:
            self.drivers = list()
        self.drivers.append(Driver)

    join_cell = add_driver

{code}

  was:
Hi, I am currently developing a framework targeting back end state management. To ensure exactly-once processing of events in back end, I intend to use Flink Stateful Functions runtime in combination with Python's asyncio. I hope to receive some feedbacks. 

The following code shows an example (draft) of writing a back end micro service using my framework. It is intended to be equivalent (exchangeable) with Flink-stateful examples/ridesharing. The idea is that "Event" is reducible to an async function call, and external egress can be emitted by saving an object. This preserves the exactly-once features of Flink-statefun while adding a great deal of readability to the code. 

Reviews are appreciated. Thank you! 

```python3
"""
Equivalent implementation for flink stateful functions example - ridesharing
ref: https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java
"""
from onto.models.base import Serializable

"""
Rewrite callback-style code to async-await: 
ref: https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript 
"""

from onto.domain_model import DomainModel
from onto.attrs import attrs


class RideshareBase(DomainModel):
    pass


class Passenger(RideshareBase):

    async def request_ride(self, start_geo_cell, end_geo_cell):
        r = Ride.create()  # TODO: implement create
        await r.passenger_joins(
            passenger=self,
            start_geo_cell=start_geo_cell,
            end_geo_cell=end_geo_cell
        )

    class PassengerMessage(DomainModel):
        passenger = attrs.relation('Passenger')

        class RideFailedMessage(Serializable):
            ride = attrs.relation('Ride')

        ride_failed = attrs.embed(RideFailedMessage).optional

        class DriverHasBeenFoundMessage(Serializable):
            driver = attrs.relation('Driver')
            driver_geo_cell = attrs.relation('GeoCell')

        driver_found = attrs.embed(RideFailedMessage).optional

        class RideHasStarted(Serializable):
            driver = attrs.relation('Driver')

        ride_started = attrs.embed(RideHasStarted).optional

        class RideHasEnded(Serializable):
            pass  # TODO: make sure that empty class works

        ride_ended = attrs.embed(RideHasEnded).optional

    async def ride_failed(self, ride: 'Ride'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_failed=self.PassengerMessage.RideFailedMessage.new(
                ride=ride
            )
        )
        message.save()

    async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 'GeoCell'):
        message = self.PassengerMessage.new(
            passenger=self,
            driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new(
                driver=driver,
                driver_geo_cell=driver_geo_cell
            )
        )
        message.save()

    async def ride_started(self, driver: 'Driver'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasStarted.new(
                driver=driver
            )
        )
        message.save()

    async def ride_ended(self):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasEnded.new()
        )
        message.save()


class DriverRejectsPickupError(RideshareBase, Exception):
    driver = attrs.relation(dm_cls='Driver')
    ride = attrs.relation(dm_cls='Ride')


class Driver(RideshareBase):
    is_taken: bool = attrs.required
    current_ride = attrs.relation(dm_cls='Ride').optional
    current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell')

    @is_taken.getter
    def is_taken(self):
        # TODO: make better
        return self.current_ride is not None

    async def pickup_passenger(self, ride: 'Ride', passenger: Passenger,
                passenger_start_cell: 'GeoCell',
                passenger_end_cell: 'GeoCell'):
        if self.is_taken:
            raise DriverRejectsPickupError(driver=self, ride=ride)
        self.current_ride = ride

        # "    // We also need to unregister ourselves from the current geo cell we belong to."
        if geo_cell := self.current_location:
            await geo_cell.leave_cell(driver=self)

        await ride.driver_joins(driver=self, driver_location=self.current_location)

        message = self.DriverMessage.new(
            driver=self,
            pickup_passenger=self.DriverMessage.PickupPassengerMessage.new(
                passenger=passenger,
                start_geo_location=passenger_start_cell,
                end_geo_location=passenger_end_cell
            )
        )
        message.save()

    class DriverMessage(RideshareBase):
        driver = attrs.relation('Driver')

        class PickupPassengerMessage(Serializable):
            ride = attrs.relation('Ride')  # TODO: maybe passenger_id
            start_geo_location = attrs.relation('GeoCell')
            end_geo_location = attrs.relation('GeoCell')

        pickup_passenger = attrs.embed(PickupPassengerMessage)

    async def ride_has_started(self):
        await self.current_ride.ride_started(driver=self, driver_geo_cell=self.current_location)

    async def ride_has_ended(self):
        await self.current_ride.ride_ended()

    async def location_is_updated(self, current_geo_cell: 'GeoCell'):
        # TODO: maybe switch to embed:     final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell();
        updated = current_geo_cell
        last = self.current_location
        if last is None:
            self.current_location = updated
            await updated.join_cell()
            return
        elif last == updated:
            return
        else:
            self.current_location = updated


class Ride(RideshareBase):
    passenger = attrs.relation(dm_cls=Passenger)
    driver = attrs.relation(dm_cls=Driver)

    async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'):
        await self.passenger.ride_started(driver=driver)

    async def passenger_joins(
            self,
            passenger: Passenger,
            start_geo_cell: 'GeoCell',
            end_geo_cell: 'GeoCell'
    ):
        self.passenger = passenger
        MAX_RETRY = 5
        # Ref: https://stackoverflow.com/a/7663441
        for trial in range(MAX_RETRY):
            if driver := start_geo_cell.get_driver():
                try:
                    await driver.pickup_passenger(
                        ride=self,
                        passenger=passenger,
                        passenger_start_cell=start_geo_cell,
                        passenger_end_cell=end_geo_cell
                    )
                except DriverRejectsPickupError as _:
                    # TODO: NOTE difference from java impl
                    """
                    final int startGeoCell = passenger.get().getStartGeoCell();
                    String cellKey = String.valueOf(startGeoCell);
                    context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
                    """
                    continue  # to retry
                else:
                    break
        else:
            await passenger.ride_failed(ride=self)

    async def driver_joins(self, driver, driver_location):
        self.driver = driver
        await self.passenger.driver_joins_ride(driver=driver, driver_geo_cell=driver_location)

    async def ride_ended(self, ):
        await self.passenger.ride_ended()
        self.passenger = None
        self.driver = None


class GeoCell(RideshareBase):

    drivers: list = attrs.list(
        value=attrs.relation(dm_cls=Driver)
    )

    async def get_driver(self) -> Driver:
        if len(self.drivers) != 0:
            next_driver = self.drivers[0]
            return next_driver
        else:
            return None

    async def leave_cell(self, driver: Driver):
        self.drivers.remove(driver)

    async def add_driver(self):
        # TODO: mutated local variable vs mutated instance state;
        #  may cause difference in behavior
        if self.drivers is None:
            self.drivers = list()
        self.drivers.append(Driver)

    join_cell = add_driver

```


> Looking for reviews on a framework based on flink-statefun 
> -----------------------------------------------------------
>
>                 Key: FLINK-21211
>                 URL: https://issues.apache.org/jira/browse/FLINK-21211
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Zixuan Rao
>            Priority: Minor
>
> Hi, I am currently developing a framework targeting back end state management. To ensure exactly-once processing of events in back end, I intend to use Flink Stateful Functions runtime in combination with Python's asyncio. I hope to receive some feedbacks. 
> The following code shows an example (draft) of writing a back end micro service using my framework. It is intended to be equivalent (exchangeable) with Flink-stateful examples/ridesharing. The idea is that "Event" is reducible to an async function call, and external egress can be emitted by saving an object. This preserves the exactly-once features of Flink-statefun while adding a great deal of readability to the code. 
> Reviews are appreciated. Thank you! 
> {code}
> """
> Equivalent implementation for flink stateful functions example - ridesharing
> ref: https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java
> """
> from onto.models.base import Serializable
> """
> Rewrite callback-style code to async-await: 
> ref: https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript 
> """
> from onto.domain_model import DomainModel
> from onto.attrs import attrs
> class RideshareBase(DomainModel):
>     pass
> class Passenger(RideshareBase):
>     async def request_ride(self, start_geo_cell, end_geo_cell):
>         r = Ride.create()  # TODO: implement create
>         await r.passenger_joins(
>             passenger=self,
>             start_geo_cell=start_geo_cell,
>             end_geo_cell=end_geo_cell
>         )
>     class PassengerMessage(DomainModel):
>         passenger = attrs.relation('Passenger')
>         class RideFailedMessage(Serializable):
>             ride = attrs.relation('Ride')
>         ride_failed = attrs.embed(RideFailedMessage).optional
>         class DriverHasBeenFoundMessage(Serializable):
>             driver = attrs.relation('Driver')
>             driver_geo_cell = attrs.relation('GeoCell')
>         driver_found = attrs.embed(RideFailedMessage).optional
>         class RideHasStarted(Serializable):
>             driver = attrs.relation('Driver')
>         ride_started = attrs.embed(RideHasStarted).optional
>         class RideHasEnded(Serializable):
>             pass  # TODO: make sure that empty class works
>         ride_ended = attrs.embed(RideHasEnded).optional
>     async def ride_failed(self, ride: 'Ride'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_failed=self.PassengerMessage.RideFailedMessage.new(
>                 ride=ride
>             )
>         )
>         message.save()
>     async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 'GeoCell'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new(
>                 driver=driver,
>                 driver_geo_cell=driver_geo_cell
>             )
>         )
>         message.save()
>     async def ride_started(self, driver: 'Driver'):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_started=self.PassengerMessage.RideHasStarted.new(
>                 driver=driver
>             )
>         )
>         message.save()
>     async def ride_ended(self):
>         message = self.PassengerMessage.new(
>             passenger=self,
>             ride_started=self.PassengerMessage.RideHasEnded.new()
>         )
>         message.save()
> class DriverRejectsPickupError(RideshareBase, Exception):
>     driver = attrs.relation(dm_cls='Driver')
>     ride = attrs.relation(dm_cls='Ride')
> class Driver(RideshareBase):
>     is_taken: bool = attrs.required
>     current_ride = attrs.relation(dm_cls='Ride').optional
>     current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell')
>     @is_taken.getter
>     def is_taken(self):
>         # TODO: make better
>         return self.current_ride is not None
>     async def pickup_passenger(self, ride: 'Ride', passenger: Passenger,
>                 passenger_start_cell: 'GeoCell',
>                 passenger_end_cell: 'GeoCell'):
>         if self.is_taken:
>             raise DriverRejectsPickupError(driver=self, ride=ride)
>         self.current_ride = ride
>         # "    // We also need to unregister ourselves from the current geo cell we belong to."
>         if geo_cell := self.current_location:
>             await geo_cell.leave_cell(driver=self)
>         await ride.driver_joins(driver=self, driver_location=self.current_location)
>         message = self.DriverMessage.new(
>             driver=self,
>             pickup_passenger=self.DriverMessage.PickupPassengerMessage.new(
>                 passenger=passenger,
>                 start_geo_location=passenger_start_cell,
>                 end_geo_location=passenger_end_cell
>             )
>         )
>         message.save()
>     class DriverMessage(RideshareBase):
>         driver = attrs.relation('Driver')
>         class PickupPassengerMessage(Serializable):
>             ride = attrs.relation('Ride')  # TODO: maybe passenger_id
>             start_geo_location = attrs.relation('GeoCell')
>             end_geo_location = attrs.relation('GeoCell')
>         pickup_passenger = attrs.embed(PickupPassengerMessage)
>     async def ride_has_started(self):
>         await self.current_ride.ride_started(driver=self, driver_geo_cell=self.current_location)
>     async def ride_has_ended(self):
>         await self.current_ride.ride_ended()
>     async def location_is_updated(self, current_geo_cell: 'GeoCell'):
>         # TODO: maybe switch to embed:     final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell();
>         updated = current_geo_cell
>         last = self.current_location
>         if last is None:
>             self.current_location = updated
>             await updated.join_cell()
>             return
>         elif last == updated:
>             return
>         else:
>             self.current_location = updated
> class Ride(RideshareBase):
>     passenger = attrs.relation(dm_cls=Passenger)
>     driver = attrs.relation(dm_cls=Driver)
>     async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'):
>         await self.passenger.ride_started(driver=driver)
>     async def passenger_joins(
>             self,
>             passenger: Passenger,
>             start_geo_cell: 'GeoCell',
>             end_geo_cell: 'GeoCell'
>     ):
>         self.passenger = passenger
>         MAX_RETRY = 5
>         # Ref: https://stackoverflow.com/a/7663441
>         for trial in range(MAX_RETRY):
>             if driver := start_geo_cell.get_driver():
>                 try:
>                     await driver.pickup_passenger(
>                         ride=self,
>                         passenger=passenger,
>                         passenger_start_cell=start_geo_cell,
>                         passenger_end_cell=end_geo_cell
>                     )
>                 except DriverRejectsPickupError as _:
>                     # TODO: NOTE difference from java impl
>                     """
>                     final int startGeoCell = passenger.get().getStartGeoCell();
>                     String cellKey = String.valueOf(startGeoCell);
>                     context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
>                     """
>                     continue  # to retry
>                 else:
>                     break
>         else:
>             await passenger.ride_failed(ride=self)
>     async def driver_joins(self, driver, driver_location):
>         self.driver = driver
>         await self.passenger.driver_joins_ride(driver=driver, driver_geo_cell=driver_location)
>     async def ride_ended(self, ):
>         await self.passenger.ride_ended()
>         self.passenger = None
>         self.driver = None
> class GeoCell(RideshareBase):
>     drivers: list = attrs.list(
>         value=attrs.relation(dm_cls=Driver)
>     )
>     async def get_driver(self) -> Driver:
>         if len(self.drivers) != 0:
>             next_driver = self.drivers[0]
>             return next_driver
>         else:
>             return None
>     async def leave_cell(self, driver: Driver):
>         self.drivers.remove(driver)
>     async def add_driver(self):
>         # TODO: mutated local variable vs mutated instance state;
>         #  may cause difference in behavior
>         if self.drivers is None:
>             self.drivers = list()
>         self.drivers.append(Driver)
>     join_cell = add_driver
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)