You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by flyisland <fl...@gmail.com> on 2018/10/04 02:10:06 UTC

Re: Is Splittable DoFn suitable for fetch data from a socket server?

Hi Raghu,

> Assuming you need to ack on the same connection that served the records,
finalize() functionality in UnboundedSource API is important case. You can
use UnboundeSource API for now.

I have got a new question now, where should I keep the connection for later
ack action?

The MqttIO/JmsIO all acked messages in
the UnboundedSource.CheckpointMark.finalizeCheckpoint() method, but I found
in the javadoc it said:

>  It is NOT safe to assume the UnboundedSource.UnboundedReader from which
this checkpoint was created still exists at the time this method is called.

I do encounter this situation in my testing with the Direct Runner, the
"msg.ack()" method failed when the finalizeCheckpoint() method is called
since the related reader has already been closed!

Is there any way to ask the runner to call finalizeCheckpoint() method
before it closed the Reader?


On Sat, Sep 22, 2018 at 7:01 AM Raghu Angadi <ra...@google.com> wrote:

> > This in-house built socket server could accept multiple clients, but
> only send messages to the first-connected client, and will send messages to
> the second client if the first one disconnected.
>
> Server sending messages to first client connection only is quite critical.
> Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
> in your case. Note that is says it reuses, but that does not guarantee
> single DoFn instance or when it actually calls TearDown(). It is on
> best-effort basis. The work could move to a different worker and the DoFn
> instance on earlier worker can live for a long time. So if you held the
> connection to server until TearDown() is called, you could be inadvertently
> blocking reads from DoFn on the new worker. If you want to keep the
> connection open across bundles, you need some way to close an idle
> connection asynchronously (alternately your service might have timeout to
> close an idle client connection, which is much better). Since you can't
> afford to wait till TearDown(), you might as well have a singleton
> connection that gets closed after some idle time.
>
> Assuming you need to ack on the same connection that served the records,
> finalize() functionality in UnboundedSource API is important case. You can
> use UnboundeSource API for now.
>
> On Thu, Sep 20, 2018 at 8:25 PM flyisland <fl...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> There is no explicit ID in the message itself, and if there is
>> information can be used as an ID is depend on use cases.
>>
>> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Is there information in the message that can be used as an id, that can
>>> be used for deduplication?
>>>
>>> On Thu, Sep 20, 2018 at 6:36 PM flyisland <fl...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> With the current API we provided, messages cannot be acked from a
>>>> different client.
>>>>
>>>> The server will re-send messages to the reconnected client if those
>>>> messages were not acked. So there'll be duplicate messages, but with a
>>>> "redeliver times" property in the header.
>>>>
>>>> Thanks for your helpful information, I'll check the UnboundedSources,
>>>> thanks!
>>>>
>>>>
>>>>
>>>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Are duplicate messages ok?
>>>>>
>>>>> Can you ack messages from a different client or are messages sticky to
>>>>> a single client (e.g. if one client loses connection, when it reconnects
>>>>> can it ack messages it received or are those messages automatically
>>>>> replayed)?
>>>>>
>>>>> UnboundedSources are the only current "source" type that supports
>>>>> finalization callbacks[1] that you will need to ack messages and
>>>>> deduplication[2]. SplittableDoFn will support both of these features but
>>>>> are not there yet.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>>>> 2:
>>>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>>>>>
>>>>>
>>>>> On Wed, Sep 19, 2018 at 8:31 PM flyisland <fl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Lukasz,
>>>>>>
>>>>>> This socket server is like an MQTT server, it has queues inside it
>>>>>> and the client should ack each message.
>>>>>>
>>>>>> > Is receiving and processing each message reliably important or is
>>>>>> it ok to drop messages when things fail?
>>>>>> A: Reliable is important, no messages should be lost.
>>>>>>
>>>>>> > Is there a message acknowledgement system or can you request a
>>>>>> position within the message stream (e.g. send all messages from position X
>>>>>> when connecting and if for whatever reason you need to reconnect you can
>>>>>> say send messages from position X to replay past messages)?
>>>>>> A: Client should ack each message it received, and the server will
>>>>>> delete the acked message. If the client broked and the server do not
>>>>>> receive an ack, the server will re-send the message to the client while it
>>>>>> online again. And there is no "position" concept like kafka.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Before getting into what you could use and the current state of
>>>>>>> SplittableDoFn and its supported features, I was wondering what reliability
>>>>>>> guarantees does the socket server have around messages?
>>>>>>>
>>>>>>> Is receiving and processing each message reliably important or is it
>>>>>>> ok to drop messages when things fail?
>>>>>>> Is there a message acknowledgement system or can you request a
>>>>>>> position within the message stream (e.g. send all messages from position X
>>>>>>> when connecting and if for whatever reason you need to reconnect you can
>>>>>>> say send messages from position X to replay past messages)?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <fl...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi Gurus,
>>>>>>>>
>>>>>>>> I'm trying to create an IO connector to fetch data from a socket
>>>>>>>> server from Beam, I'm new to Beam, but according to this blog <
>>>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
>>>>>>>> seems that SDF is the recommended way to implement an IO connector now.
>>>>>>>>
>>>>>>>> This in-house built socket server could accept multiple clients,
>>>>>>>> but only send messages to the first-connected client, and will send
>>>>>>>> messages to the second client if the first one disconnected.
>>>>>>>>
>>>>>>>> To understand the lifecycle of a DoFn, I've just created a very
>>>>>>>> simple DoFn subclass, call log.debug() in every method, and according to
>>>>>>>> the JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>>>>>>>> in-memory resources, such as network connections. The resources can then be
>>>>>>>> disposed in DoFn.Teardown." I guess I should create the connection to the
>>>>>>>> socket server in the setup() method.
>>>>>>>>
>>>>>>>> But based on the log messages below, even the input PCollection has
>>>>>>>> only one element, Beam will still create more multiple DemoIO instances and
>>>>>>>> invoked a different DemoIO instance after every checkpoint.
>>>>>>>>
>>>>>>>> I'm wondering:
>>>>>>>> 1. How could I let Beam create only one DemoIO instance, or at
>>>>>>>> least use the same instance constantly?
>>>>>>>> 2. Or should I use the Source API for such purpose?
>>>>>>>>
>>>>>>>> Thanks in advance.
>>>>>>>>
>>>>>>>> Logs:
>>>>>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>>>>>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> First->getInitialRestriction() is called!
>>>>>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>>>>>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>>>>>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>>>>>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>>>>>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>>>>>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>>>>>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:56.285Z -> 0 -> First
>>>>>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>>>>>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:56.786Z -> 1 -> First
>>>>>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>>>>>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>>>>>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:57.354Z -> 2 -> First
>>>>>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:57.856Z -> 3 -> First
>>>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>>>>>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>>>> 2018-09-18T23:15:58.358Z -> 4 -> First
>>>>>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>>>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>>>>>> called!
>>>>>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>>>>>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>>>>>>>
>>>>>>>> WindowedWordCountSDF.java
>>>>>>>>
>>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>> List<String> LINES = Arrays.asList("First");
>>>>>>>> PCollection<String> input =
>>>>>>>>     pipeline
>>>>>>>>             .apply(Create.of(LINES))
>>>>>>>>             .apply(ParDo.of(new DemoIO()));
>>>>>>>> ...
>>>>>>>>
>>>>>>>>
>>>>>>>> DemoIO.java
>>>>>>>>
>>>>>>>> public class DemoIO extends DoFn<String, String> {
>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(DemoIO.class);
>>>>>>>>
>>>>>>>>     public DemoIO(){
>>>>>>>>         super();
>>>>>>>>         LOG.debug("{}->new DemoIO() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>>>>>>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>>>>>>>
>>>>>>>>         for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
>>>>>>>>             sleep(500);
>>>>>>>>             c.outputWithTimestamp(i + " -> " + c.element(), Instant.now());
>>>>>>>>         }
>>>>>>>>         LOG.debug("{}->process({}) end!", this, tracker);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @GetInitialRestriction
>>>>>>>>     public OffsetRange getInitialRestriction(String input) {
>>>>>>>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>>>>>>>         return new OffsetRange(0, Long.MAX_VALUE);
>>>>>>>> //        return new OffsetRange(0, 100);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @NewTracker
>>>>>>>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>>>>>>>         LOG.debug("{}->newTracker() is called!", range);
>>>>>>>>         return new OffsetRangeTracker(range);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Setup
>>>>>>>>     public void setup(){
>>>>>>>>         LOG.debug("{}->setup() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @StartBundle
>>>>>>>>     public void startBundle(){
>>>>>>>>         LOG.debug("{}->startBundle() is called!", this);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>