You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zixuan Rao (Jira)" <ji...@apache.org> on 2021/01/29 21:33:00 UTC

[jira] [Created] (FLINK-21211) Looking for reviews on a framework based on flink-statefun

Zixuan Rao created FLINK-21211:
----------------------------------

             Summary: Looking for reviews on a framework based on flink-statefun 
                 Key: FLINK-21211
                 URL: https://issues.apache.org/jira/browse/FLINK-21211
             Project: Flink
          Issue Type: New Feature
            Reporter: Zixuan Rao


Hi, I am currently developing a framework targeting back end state management. To ensure exactly-once processing of events in back end, I intend to use Flink Stateful Functions runtime in combination with Python's asyncio. I hope to receive some feedbacks. 

The following code shows an example (draft) of writing a back end micro service using my framework. It is intended to be equivalent (exchangeable) with Flink-stateful examples/ridesharing. The idea is that "Event" is reducible to an async function call, and external egress can be emitted by saving an object. This preserves the exactly-once features of Flink-statefun while adding a great deal of readability to the code. 

Reviews are appreciated. Thank you! 

```python3
"""
Equivalent implementation for flink stateful functions example - ridesharing
ref: https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java
"""
from onto.models.base import Serializable

"""
Rewrite callback-style code to async-await: 
ref: https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript 
"""

from onto.domain_model import DomainModel
from onto.attrs import attrs


class RideshareBase(DomainModel):
    pass


class Passenger(RideshareBase):

    async def request_ride(self, start_geo_cell, end_geo_cell):
        r = Ride.create()  # TODO: implement create
        await r.passenger_joins(
            passenger=self,
            start_geo_cell=start_geo_cell,
            end_geo_cell=end_geo_cell
        )

    class PassengerMessage(DomainModel):
        passenger = attrs.relation('Passenger')

        class RideFailedMessage(Serializable):
            ride = attrs.relation('Ride')

        ride_failed = attrs.embed(RideFailedMessage).optional

        class DriverHasBeenFoundMessage(Serializable):
            driver = attrs.relation('Driver')
            driver_geo_cell = attrs.relation('GeoCell')

        driver_found = attrs.embed(RideFailedMessage).optional

        class RideHasStarted(Serializable):
            driver = attrs.relation('Driver')

        ride_started = attrs.embed(RideHasStarted).optional

        class RideHasEnded(Serializable):
            pass  # TODO: make sure that empty class works

        ride_ended = attrs.embed(RideHasEnded).optional

    async def ride_failed(self, ride: 'Ride'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_failed=self.PassengerMessage.RideFailedMessage.new(
                ride=ride
            )
        )
        message.save()

    async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 'GeoCell'):
        message = self.PassengerMessage.new(
            passenger=self,
            driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new(
                driver=driver,
                driver_geo_cell=driver_geo_cell
            )
        )
        message.save()

    async def ride_started(self, driver: 'Driver'):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasStarted.new(
                driver=driver
            )
        )
        message.save()

    async def ride_ended(self):
        message = self.PassengerMessage.new(
            passenger=self,
            ride_started=self.PassengerMessage.RideHasEnded.new()
        )
        message.save()


class DriverRejectsPickupError(RideshareBase, Exception):
    driver = attrs.relation(dm_cls='Driver')
    ride = attrs.relation(dm_cls='Ride')


class Driver(RideshareBase):
    is_taken: bool = attrs.required
    current_ride = attrs.relation(dm_cls='Ride').optional
    current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell')

    @is_taken.getter
    def is_taken(self):
        # TODO: make better
        return self.current_ride is not None

    async def pickup_passenger(self, ride: 'Ride', passenger: Passenger,
                passenger_start_cell: 'GeoCell',
                passenger_end_cell: 'GeoCell'):
        if self.is_taken:
            raise DriverRejectsPickupError(driver=self, ride=ride)
        self.current_ride = ride

        # "    // We also need to unregister ourselves from the current geo cell we belong to."
        if geo_cell := self.current_location:
            await geo_cell.leave_cell(driver=self)

        await ride.driver_joins(driver=self, driver_location=self.current_location)

        message = self.DriverMessage.new(
            driver=self,
            pickup_passenger=self.DriverMessage.PickupPassengerMessage.new(
                passenger=passenger,
                start_geo_location=passenger_start_cell,
                end_geo_location=passenger_end_cell
            )
        )
        message.save()

    class DriverMessage(RideshareBase):
        driver = attrs.relation('Driver')

        class PickupPassengerMessage(Serializable):
            ride = attrs.relation('Ride')  # TODO: maybe passenger_id
            start_geo_location = attrs.relation('GeoCell')
            end_geo_location = attrs.relation('GeoCell')

        pickup_passenger = attrs.embed(PickupPassengerMessage)

    async def ride_has_started(self):
        await self.current_ride.ride_started(driver=self, driver_geo_cell=self.current_location)

    async def ride_has_ended(self):
        await self.current_ride.ride_ended()

    async def location_is_updated(self, current_geo_cell: 'GeoCell'):
        # TODO: maybe switch to embed:     final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell();
        updated = current_geo_cell
        last = self.current_location
        if last is None:
            self.current_location = updated
            await updated.join_cell()
            return
        elif last == updated:
            return
        else:
            self.current_location = updated


class Ride(RideshareBase):
    passenger = attrs.relation(dm_cls=Passenger)
    driver = attrs.relation(dm_cls=Driver)

    async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'):
        await self.passenger.ride_started(driver=driver)

    async def passenger_joins(
            self,
            passenger: Passenger,
            start_geo_cell: 'GeoCell',
            end_geo_cell: 'GeoCell'
    ):
        self.passenger = passenger
        MAX_RETRY = 5
        # Ref: https://stackoverflow.com/a/7663441
        for trial in range(MAX_RETRY):
            if driver := start_geo_cell.get_driver():
                try:
                    await driver.pickup_passenger(
                        ride=self,
                        passenger=passenger,
                        passenger_start_cell=start_geo_cell,
                        passenger_end_cell=end_geo_cell
                    )
                except DriverRejectsPickupError as _:
                    # TODO: NOTE difference from java impl
                    """
                    final int startGeoCell = passenger.get().getStartGeoCell();
                    String cellKey = String.valueOf(startGeoCell);
                    context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
                    """
                    continue  # to retry
                else:
                    break
        else:
            await passenger.ride_failed(ride=self)

    async def driver_joins(self, driver, driver_location):
        self.driver = driver
        await self.passenger.driver_joins_ride(driver=driver, driver_geo_cell=driver_location)

    async def ride_ended(self, ):
        await self.passenger.ride_ended()
        self.passenger = None
        self.driver = None


class GeoCell(RideshareBase):

    drivers: list = attrs.list(
        value=attrs.relation(dm_cls=Driver)
    )

    async def get_driver(self) -> Driver:
        if len(self.drivers) != 0:
            next_driver = self.drivers[0]
            return next_driver
        else:
            return None

    async def leave_cell(self, driver: Driver):
        self.drivers.remove(driver)

    async def add_driver(self):
        # TODO: mutated local variable vs mutated instance state;
        #  may cause difference in behavior
        if self.drivers is None:
            self.drivers = list()
        self.drivers.append(Driver)

    join_cell = add_driver

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)