You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by yu...@oracle.com on 2021/01/27 02:55:32 UTC

Re: [External] : Re: outputReceiver.output() does not emit the result immediately

Hi Boyuan, 

Thanks for replying. We are using beam 2.25.0 and direct runner for testing. We are trying to develop an unbounded streaming service connector with splittable DoFn. In our connector.read(), we want to commit the message back to stream after output the record to downstream user pipeline. The read and user pipeline looks like this:
public class Connector {
    public static Connector.Read read() {
        return new AutoValue_Connector_Read.Builder()
                .setStream("")
                .setStreamPartitions(Collections.singletonList(0))
                .build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PBegin, PCollection<Record>> {
        @Override
        public PCollection<Record> expand(PBegin input) {
            PCollection<SourceDescriptor > output = input.getPipeline()
                    .apply(Impulse.create())
                    .apply(ParDo.of(new GenerateSourceDescriptor (this)));

            // then apply the SDF read DoFn on it
            return output.apply(ParDo.of(new ReadDoFn((this))));
        }
    }
}

@DoFn.UnboundedPerElement
class ReadDoFn extends DoFn<SourceDescriptor, Record> {
    @ProcessElement
    public ProcessContinuation processElement(@Element SourceDescriptor  sourceDescriptor ,
                                              RestrictionTracker<OffsetRange, Long> tracker,
                                              OutputReceiver<Record> receiver) {

        while (true) {
            List<Message> messages = getMessageFromStream(cursor);
            if (messages.isEmpty()) {
                return DoFn.ProcessContinuation.resume();
            }
            for (Message message : messages) {
                if (!tracker.tryClaim(message)) {
                    return DoFn.ProcessContinuation.stop();
                }

                Reacord record = Record(message);
                // output to user pipeline
                receiver.outputWithTimestamp(record, Instant.now());

            }
            // commit this batch of messages and get updated cursor to read next batch of message
            cursor = commitMessage();

        }
    }
}

//////////////////////////////// pipeline use Connector.read() to read from stream /////////////////////////////////////

class UserPipline {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
        p.getOptions().as(StreamingOptions.class).setStreaming(true);

        PCollection<KV<String, String>>
                output =
                p.apply("Read Stream", Connector.read().setStream("stream1"))
                        .apply("Log Record", ParDo.of(new DoFn<Record, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(@Element Record input, OutputReceiver<KV<String, String>> out) {
                                System.out.printf("[User Pipeline] received offset %s : %s : %s \n", input.getOffset(), input.getKV().getKey(), input.getKV().getValue());
                                out.output(input.getKV());
                            }
                        }));
    }
}
Since we commit the message after `outputReceiver.output()`, and use the cursor in commit response to get next message, if the `outputReceiver.output()` does not emit immediately, and buffer message 0, 1, 2, then if user pipeline stops and restarts, message 0, 1 are lost as `outputReceiver.output() has not emitted them, but messages have been committed in connector.

Is this the expected behavior of `outputReceiver.output()`, if so, how could we properly commit the message/ checkpoint in connector so downstream will not lost message when starting over. 

Thanks,
Yu
 

> On Jan 26, 2021, at 10:13, Boyuan Zhang <bo...@google.com> wrote:
> 
> +dev <ma...@beam.apache.org> 
> 
> Hi Yu,
> Which runner are you using for your pipeline? Also it would be helpful to share your pipeline code as well.
> 
> On Mon, Jan 25, 2021 at 10:19 PM <yu.b.zhang@oracle.com <ma...@oracle.com>> wrote:
> Hi Beam Community,
> 
> I have a splittable `DoFn` that reads message from some stream and output the result to down stream. The pseudo code looks like:
> @DoFn.ProcessElement
> public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor sourceDescriptor,
>                                                RestrictionTracker<OffsetRange, Long> tracker,
>                                                WatermarkEstimator watermarkEstimator,
>                                                DoFn.OutputReceiver<Record> receiver) throws Exception {
>     while(true){
>         messages = getMessageFromStream();
>         if (messages.isEmpty()) {
>             return DoFn.ProcessContinuation.resume();
>         }
>         for(message: messages){
>             if (!tracker.tryClaim(message)) {
>                 return DoFn.ProcessContinuation.stop();
>             }
>             record = Record(message);
>             receiver.outputWithTimestamp(record, message.getTimestamp);
>         }
>     }
> }
> 
> I expected to see the output in downstream immediately, but the results are grouped into batch (4, 5 output) and emitted to down stream. Is this size configurable in `DoFn` or runner? 
> 
> Thanks for any answer,
> Yu
>  
> 
>