You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Chawla,Sumit " <su...@gmail.com> on 2016/07/27 23:30:49 UTC

Fwd: Suggestion for Writing Sink Implementation

Hi

Please suggest me on what is the best way to write a Sink in Beam.  I see
that there is a Sink<T> abstract class which is in experimental state.
What is the expected outcome of this one? Do we have the api frozen, or
this could still change?  Most of the existing Sink implementations like
KafkaIO.Write are not using this interface, and instead extends
PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to extend
Sink<>.


My immediate requirement is to run this Sink on FlinkRunner. Which mandates
that my implementation must also implement SinkFunction<>.  In that case,
none of the Sink<> methods get called anyway.

Regards
Sumit Chawla

Re: Suggestion for Writing Sink Implementation

Posted by Maximilian Michels <mx...@apache.org>.
Hi Kenneth,

The problem is that the Write transform is not supported in streaming
execution of the Flink Runner because the streaming execution doesn't
currently support side inputs. PR is open to fix that..

Cheers,
Max

On Thu, Jul 28, 2016 at 8:56 PM, Kenneth Knowles <kl...@google.com.invalid> wrote:
> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>> runner, and I'll then defer to somebody familiar with it.
>>
>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
>> wrote:
>>
>> > Thanks a lot Eugene.
>> >
>> > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
>> > mandates that my implementation must also implement SinkFunction<>.  In
>> > that >>>case, none of the Sink<> methods get called anyway.
>> >
>> > I am using FlinkRunner. The Sink implementation that i was writing by
>> > extending Sink<> class had to implement Flink Specific SinkFunction for
>> the
>> > correct translation.
>> >
>> > private static class WriteSinkStreamingTranslator<T> implements
>> >
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>> > {
>> >
>> >   @Override
>> >   public void translateNode(Write.Bound<T> transform,
>> > FlinkStreamingTranslationContext context) {
>> >     String name = transform.getName();
>> >     PValue input = context.getInput(transform);
>> >
>> >     Sink<T> sink = transform.getSink();
>> >     if (!(sink instanceof UnboundedFlinkSink)) {
>> >       throw new UnsupportedOperationException("At the time, only
>> > unbounded Flink sinks are supported.");
>> >     }
>> >
>> >     DataStream<WindowedValue<T>> inputDataSet =
>> > context.getInputDataStream(input);
>> >
>> >     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>()
>> {
>> >       @Override
>> >       public void flatMap(WindowedValue<T> value, Collector<Object>
>> > out) throws Exception {
>> >         out.collect(value.getValue());
>> >       }
>> >     }).addSink(((UnboundedFlinkSink<Object>)
>> > sink).getFlinkSource()).name(name);
>> >   }
>> > }
>> >
>> >
>> >
>> >
>> > Regards
>> > Sumit Chawla
>> >
>> >
>> > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>> > kirpichov@google.com.invalid> wrote:
>> >
>> > > Hi Sumit,
>> > >
>> > > All reusable parts of a pipeline, including connectors to storage
>> > systems,
>> > > should be packaged as PTransform's.
>> > >
>> > > Sink is an advanced API that you can use under the hood to implement
>> the
>> > > transform, if this particular connector benefits from this API - but
>> you
>> > > don't have to, and many connectors indeed don't need it, and are
>> simpler
>> > to
>> > > implement just as wrappers around a couple of ParDo's writing the data.
>> > >
>> > > Even if the connector is implemented using a Sink, packaging the
>> > connector
>> > > as a PTransform is important because it's easier to apply in a pipeline
>> > and
>> > > because it's more future-proof (the author of the connector may later
>> > > change it to use something else rather than Sink under the hood without
>> > > breaking existing users).
>> > >
>> > > Sink is, currently, useful in the following case:
>> > > - You're writing a bounded amount of data (we do not yet have an
>> > unbounded
>> > > Sink analogue)
>> > > - The location you're writing to is known at pipeline construction
>> time,
>> > > and does not depend on the data itself (support for "data-dependent"
>> > sinks
>> > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>> > > - The storage system you're writing to has a distinct "initialization"
>> > and
>> > > "finalization" step, allowing the write operation to appear atomic
>> > (either
>> > > all data is written or none). This mostly applies to files (where
>> writing
>> > > is done by first writing to a temporary directory, and then renaming
>> all
>> > > files to their final location), but there can be other cases too.
>> > >
>> > > Here's an example GCP connector using the Sink API under the hood:
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>> > > Most other non-file-based connectors, indeed, don't (KafkaIO,
>> > DatastoreIO,
>> > > BigtableIO etc.)
>> > >
>> > > I'm not familiar with the Flink API, however I'm a bit confused by your
>> > > last paragraph: the Beam programming model is intentionally
>> > > runner-agnostic, so that you can run exactly the same code on different
>> > > runners.
>> > >
>> > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi
>> > > >
>> > > > Please suggest me on what is the best way to write a Sink in Beam.  I
>> > see
>> > > > that there is a Sink<T> abstract class which is in experimental
>> state.
>> > > > What is the expected outcome of this one? Do we have the api frozen,
>> or
>> > > > this could still change?  Most of the existing Sink implementations
>> > like
>> > > > KafkaIO.Write are not using this interface, and instead extends
>> > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
>> > > extend
>> > > > Sink<>.
>> > > >
>> > > >
>> > > > My immediate requirement is to run this Sink on FlinkRunner. Which
>> > > mandates
>> > > > that my implementation must also implement SinkFunction<>.  In that
>> > case,
>> > > > none of the Sink<> methods get called anyway.
>> > > >
>> > > > Regards
>> > > > Sumit Chawla
>> > > >
>> > >
>> >
>>

Re: Suggestion for Writing Sink Implementation

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Fri, Jul 29, 2016 at 12:56 PM, Dan Halperin <dh...@google.com.invalid>
wrote:

>
> BEAM-452 and https://github.com/apache/incubator-beam/pull/690
>
> Raghu, do you see this cache necessary once that work is in?


nope. I didn't realize the feature is close to be merged. Thanks!

Re: Suggestion for Writing Sink Implementation

Posted by Dan Halperin <dh...@google.com.INVALID>.
The upcoming work on DoFn setup/teardown completely solves the issue in
KafkaIO. You open the connection in setup, but it is used for multiple
bundles. However, this is not in yet and so yes, the producer is
opened/closed every bundle.

BEAM-452 and https://github.com/apache/incubator-beam/pull/690

Raghu, do you see this cache necessary once that work is in?

On Fri, Jul 29, 2016 at 12:46 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Sumit,
>
> Not sure I follow you.
>
> Which resource cleanup are you talking about:
> - the close() on the reader (source) ?
> - the finishBundle() on the writer (sink) ?
>
> Regards
> JB
>
>
> On 07/29/2016 09:35 PM, Chawla,Sumit  wrote:
>
>> Hi Raghu
>>
>> My source is going to be unbounded (streaming) with writes to Cassandra.
>> Only concern with KafkaIO. write is that producer is closed after every
>> bundle, and every bundle may have to open a new connection to Kafka.  (
>> Please correct me if i am wrong: I am assuming the bundle to be equivalent
>> to Window Size\Mini-batch).
>>
>> In Jean's implementation i see a different style of resource cleanup. Can
>> someone please explain when that finalize method is called?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <rangadi@google.com.invalid
>> >
>> wrote:
>>
>> It is the preferred pattern I think. Is your source bounded or unbounded
>>> (i.e. streaming)? If it is latter, your sink could even be simpler than
>>> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
>>> processElement().
>>>
>>> The pros are pretty clear : runner independent, pure Beam, simpler code.
>>> cons : no checkpoint/rollback, I don't know if Flink specific sink
>>> provides
>>> this either.
>>>
>>> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
>>> wrote:
>>>
>>> Any more comments on this pattern suggested by Jean?
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <klk@google.com.invalid
>>>>
>>>> wrote:
>>>>
>>>> What I said earlier is not quite accurate, though my advice is the
>>>>>
>>>> same.
>>>
>>>> Here are the corrections:
>>>>>
>>>>>  - The Write transform actually has a too-general name, and
>>>>>
>>>> Write.of(Sink)
>>>>
>>>>> only really works for finite data. It re-windows into the global window
>>>>>
>>>> and
>>>>
>>>>> replaces any triggers.
>>>>>  - So the special case in the Flink runner actually just _enables_ a
>>>>>
>>>> (fake)
>>>>
>>>>> Sink to work.
>>>>>
>>>>> We should probably rename Write to some more specific name that
>>>>>
>>>> indicates
>>>
>>>> the particular strategy, and make it easier for a user to decide
>>>>>
>>>> whether
>>>
>>>> that pattern is what they want. And the transform as-is should probably
>>>>> reject unbounded inputs.
>>>>>
>>>>> So you should still proceed with implementation via ParDo and your own
>>>>> logic. If you want some logic similar to Write (but with different
>>>>> windowing and triggering) then it is a pretty simple composite to
>>>>>
>>>> derive
>>>
>>>> something from.
>>>>>
>>>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkchawla@gmail.com
>>>>>
>>>>
>>>> wrote:
>>>>>
>>>>> Thanks Jean
>>>>>>
>>>>>> This is an interesting pattern here.  I see that its implemented as
>>>>>> PTransform, with constructs ( WriteOperation/Writer)  pretty similar
>>>>>>
>>>>> to
>>>
>>>> Sink<T> interface.  Would love to hear more pros/cons of this pattern
>>>>>>
>>>>> :)
>>>>
>>>>> .
>>>>>
>>>>>> Definitely it gives more control over connection initialization and
>>>>>> cleanup.
>>>>>>
>>>>>> Regards
>>>>>> Sumit Chawla
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
>>>>>>
>>>>> jb@nanthrax.net>
>>>>
>>>>> wrote:
>>>>>>
>>>>>> Hi Sumit,
>>>>>>>
>>>>>>> I created a PR containing Cassandra IO with a sink:
>>>>>>>
>>>>>>> https://github.com/apache/incubator-beam/pull/592
>>>>>>>
>>>>>>> Maybe it can help you.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>>
>>>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>>>>>>>
>>>>>>> Hi Kenneth
>>>>>>>>
>>>>>>>> Thanks for looking into it. I am currently trying to implement
>>>>>>>>
>>>>>>> Sinks
>>>
>>>> for
>>>>>
>>>>>> writing data into Cassandra/Titan DB.  My immediate goal is to run
>>>>>>>>
>>>>>>> it
>>>>
>>>>> on
>>>>>
>>>>>> Flink Runner.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> Sumit Chawla
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
>>>>>>>>
>>>>>>> <klk@google.com.invalid
>>>>>>
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Sumit,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I see what has happened here, from that snippet you pasted from
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner's code [1]. Thanks for looking into it!
>>>>>>>>>
>>>>>>>>> The Flink runner today appears to reject Write.Bounded transforms
>>>>>>>>>
>>>>>>>> in
>>>>
>>>>> streaming mode if the sink is not an instance of
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink.
>>>>
>>>>> The
>>>>>>
>>>>>>> intent of that code, I believe, was to special case
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink
>>>>>
>>>>>> to
>>>>>>
>>>>>>> make it easy to use an existing Flink sink, not to disable all
>>>>>>>>>
>>>>>>>> other
>>>>
>>>>> Write
>>>>>>>>> transforms. What do you think, Max?
>>>>>>>>>
>>>>>>>>> Until we fix this issue, you should use ParDo transforms to do
>>>>>>>>>
>>>>>>>> the
>>>
>>>> writing.
>>>>>>>>> If you can share a little about your sink, we may be able to
>>>>>>>>>
>>>>>>>> suggest
>>>>
>>>>> patterns for implementing it. Like Eugene said, the
>>>>>>>>>
>>>>>>>> Write.of(Sink)
>>>
>>>> transform is just a specialized pattern of ParDo's, not a Beam
>>>>>>>>>
>>>>>>>> primitive.
>>>>>>
>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific to
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner, and I'll then defer to somebody familiar with it.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
>>>>>>>>>>
>>>>>>>>> sumitkchawla@gmail.com>
>>>>>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks a lot Eugene.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>>>>>>>>>>
>>>>>>>>>> Which
>>>>
>>>>>
>>>>>>>>>>>>>> mandates that my implementation must also implement
>>>>>>>>>>>>>
>>>>>>>>>>>> SinkFunction<>.
>>>>>
>>>>>> In
>>>>>>>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>>>>>>>
>>>>>>>>>>> I am using FlinkRunner. The Sink implementation that i was
>>>>>>>>>>>
>>>>>>>>>> writing
>>>>
>>>>> by
>>>>>
>>>>>> extending Sink<> class had to implement Flink Specific
>>>>>>>>>>>
>>>>>>>>>> SinkFunction
>>>>
>>>>> for
>>>>>>
>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> correct translation.
>>>>>>>>>>>
>>>>>>>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>
>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    @Override
>>>>>>>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>>>>>>>> FlinkStreamingTranslationContext context) {
>>>>>>>>>>>      String name = transform.getName();
>>>>>>>>>>>      PValue input = context.getInput(transform);
>>>>>>>>>>>
>>>>>>>>>>>      Sink<T> sink = transform.getSink();
>>>>>>>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>>>>>>>        throw new UnsupportedOperationException("At the time,
>>>>>>>>>>>
>>>>>>>>>> only
>>>
>>>> unbounded Flink sinks are supported.");
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>>>>>>>> context.getInputDataStream(input);
>>>>>>>>>>>
>>>>>>>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>>>>>>>
>>>>>>>>>>> Object>()
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>        @Override
>>>>>>>>>>>        public void flatMap(WindowedValue<T> value,
>>>>>>>>>>>
>>>>>>>>>> Collector<Object>
>>>>>
>>>>>> out) throws Exception {
>>>>>>>>>>>          out.collect(value.getValue());
>>>>>>>>>>>        }
>>>>>>>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>>>>>>>> sink).getFlinkSource()).name(name);
>>>>>>>>>>>    }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Sumit,
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> All reusable parts of a pipeline, including connectors to
>>>>>>>>>>>>
>>>>>>>>>>> storage
>>>>
>>>>>
>>>>>>>>>>>> systems,
>>>>>>>>>>>
>>>>>>>>>>> should be packaged as PTransform's.
>>>>>>>>>>>>
>>>>>>>>>>>> Sink is an advanced API that you can use under the hood to
>>>>>>>>>>>>
>>>>>>>>>>> implement
>>>>>
>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> transform, if this particular connector benefits from this API
>>>>>>>>>>>
>>>>>>>>>> -
>>>
>>>> but
>>>>>
>>>>>>
>>>>>>>>>>>> you
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> don't have to, and many connectors indeed don't need it, and
>>>>>>>>>>>
>>>>>>>>>> are
>>>
>>>>
>>>>>>>>>>>> simpler
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>> implement just as wrappers around a couple of ParDo's writing
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>>
>>>>>
>>>>>>>>>>>> data.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Even if the connector is implemented using a Sink, packaging
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>
>>>>
>>>>>>>>>>>> connector
>>>>>>>>>>>
>>>>>>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> because it's more future-proof (the author of the connector
>>>>>>>>>>>>
>>>>>>>>>>> may
>>>
>>>> later
>>>>>>
>>>>>>> change it to use something else rather than Sink under the
>>>>>>>>>>>>
>>>>>>>>>>> hood
>>>
>>>>
>>>>>>>>>>>> without
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> breaking existing users).
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Sink is, currently, useful in the following case:
>>>>>>>>>>>> - You're writing a bounded amount of data (we do not yet have
>>>>>>>>>>>>
>>>>>>>>>>> an
>>>
>>>>
>>>>>>>>>>>> unbounded
>>>>>>>>>>>
>>>>>>>>>>> Sink analogue)
>>>>>>>>>>>> - The location you're writing to is known at pipeline
>>>>>>>>>>>>
>>>>>>>>>>> construction
>>>>
>>>>>
>>>>>>>>>>>> time,
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and does not depend on the data itself (support for
>>>>>>>>>>>
>>>>>>>>>> "data-dependent"
>>>>>
>>>>>>
>>>>>>>>>>>> sinks
>>>>>>>>>>>
>>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92
>>>>>>>>>>>>
>>>>>>>>>>> )
>>>
>>>> - The storage system you're writing to has a distinct
>>>>>>>>>>>>
>>>>>>>>>>>> "initialization"
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> "finalization" step, allowing the write operation to appear
>>>>>>>>>>>>
>>>>>>>>>>> atomic
>>>>
>>>>>
>>>>>>>>>>>> (either
>>>>>>>>>>>
>>>>>>>>>>> all data is written or none). This mostly applies to files
>>>>>>>>>>>>
>>>>>>>>>>> (where
>>>>
>>>>>
>>>>>>>>>>>> writing
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> is done by first writing to a temporary directory, and then
>>>>>>>>>>>
>>>>>>>>>> renaming
>>>>>
>>>>>>
>>>>>>>>>>>> all
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> files to their final location), but there can be other cases
>>>>>>>>>>>
>>>>>>>>>> too.
>>>
>>>>
>>>>>>>>>>>> Here's an example GCP connector using the Sink API under the
>>>>>>>>>>>>
>>>>>>>>>>> hood:
>>>>
>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>
>>>>
>>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> DatastoreIO,
>>>>>>>>>>>
>>>>>>>>>>> BigtableIO etc.)
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not familiar with the Flink API, however I'm a bit
>>>>>>>>>>>>
>>>>>>>>>>> confused
>>>
>>>> by
>>>>
>>>>>
>>>>>>>>>>>> your
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> last paragraph: the Beam programming model is intentionally
>>>>>>>>>>
>>>>>>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>>>>>>>
>>>>>>>>>>>> different
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> runners.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
>>>>>>>>>>>>
>>>>>>>>>>> sumitkchawla@gmail.com
>>>>>>
>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam.  I
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> see
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> that there is a Sink<T> abstract class which is in
>>>>>>>>>>>>
>>>>>>>>>>> experimental
>>>
>>>>
>>>>>>>>>>>>> state.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> frozen,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> or
>>>>>>>>>>
>>>>>>>>>> this could still change?  Most of the existing Sink
>>>>>>>>>>>
>>>>>>>>>> implementations
>>>>
>>>>>
>>>>>>>>>>>>> like
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> KafkaIO.Write are not using this interface, and instead
>>>>>>>>>>>>
>>>>>>>>>>> extends
>>>
>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
>>>>>>>>>>>>>
>>>>>>>>>>>> changed
>>>>
>>>>> to
>>>>>
>>>>>>
>>>>>>>>>>>>> extend
>>>>>>>>>>>>
>>>>>>>>>>>> Sink<>.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>>>>>>>>>>>>
>>>>>>>>>>>> Which
>>>>>
>>>>>>
>>>>>>>>>>>>> mandates
>>>>>>>>>>>>
>>>>>>>>>>>> that my implementation must also implement SinkFunction<>.
>>>>>>>>>>>>>
>>>>>>>>>>>> In
>>>
>>>> that
>>>>>
>>>>>>
>>>>>>>>>>>>> case,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi JB

I was referring to CassandraWriteOperation.finalize()

Regards
Sumit Chawla


On Fri, Jul 29, 2016 at 12:46 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Sumit,
>
> Not sure I follow you.
>
> Which resource cleanup are you talking about:
> - the close() on the reader (source) ?
> - the finishBundle() on the writer (sink) ?
>
> Regards
> JB
>
>
> On 07/29/2016 09:35 PM, Chawla,Sumit  wrote:
>
>> Hi Raghu
>>
>> My source is going to be unbounded (streaming) with writes to Cassandra.
>> Only concern with KafkaIO. write is that producer is closed after every
>> bundle, and every bundle may have to open a new connection to Kafka.  (
>> Please correct me if i am wrong: I am assuming the bundle to be equivalent
>> to Window Size\Mini-batch).
>>
>> In Jean's implementation i see a different style of resource cleanup. Can
>> someone please explain when that finalize method is called?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <rangadi@google.com.invalid
>> >
>> wrote:
>>
>> It is the preferred pattern I think. Is your source bounded or unbounded
>>> (i.e. streaming)? If it is latter, your sink could even be simpler than
>>> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
>>> processElement().
>>>
>>> The pros are pretty clear : runner independent, pure Beam, simpler code.
>>> cons : no checkpoint/rollback, I don't know if Flink specific sink
>>> provides
>>> this either.
>>>
>>> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
>>> wrote:
>>>
>>> Any more comments on this pattern suggested by Jean?
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <klk@google.com.invalid
>>>>
>>>> wrote:
>>>>
>>>> What I said earlier is not quite accurate, though my advice is the
>>>>>
>>>> same.
>>>
>>>> Here are the corrections:
>>>>>
>>>>>  - The Write transform actually has a too-general name, and
>>>>>
>>>> Write.of(Sink)
>>>>
>>>>> only really works for finite data. It re-windows into the global window
>>>>>
>>>> and
>>>>
>>>>> replaces any triggers.
>>>>>  - So the special case in the Flink runner actually just _enables_ a
>>>>>
>>>> (fake)
>>>>
>>>>> Sink to work.
>>>>>
>>>>> We should probably rename Write to some more specific name that
>>>>>
>>>> indicates
>>>
>>>> the particular strategy, and make it easier for a user to decide
>>>>>
>>>> whether
>>>
>>>> that pattern is what they want. And the transform as-is should probably
>>>>> reject unbounded inputs.
>>>>>
>>>>> So you should still proceed with implementation via ParDo and your own
>>>>> logic. If you want some logic similar to Write (but with different
>>>>> windowing and triggering) then it is a pretty simple composite to
>>>>>
>>>> derive
>>>
>>>> something from.
>>>>>
>>>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkchawla@gmail.com
>>>>>
>>>>
>>>> wrote:
>>>>>
>>>>> Thanks Jean
>>>>>>
>>>>>> This is an interesting pattern here.  I see that its implemented as
>>>>>> PTransform, with constructs ( WriteOperation/Writer)  pretty similar
>>>>>>
>>>>> to
>>>
>>>> Sink<T> interface.  Would love to hear more pros/cons of this pattern
>>>>>>
>>>>> :)
>>>>
>>>>> .
>>>>>
>>>>>> Definitely it gives more control over connection initialization and
>>>>>> cleanup.
>>>>>>
>>>>>> Regards
>>>>>> Sumit Chawla
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
>>>>>>
>>>>> jb@nanthrax.net>
>>>>
>>>>> wrote:
>>>>>>
>>>>>> Hi Sumit,
>>>>>>>
>>>>>>> I created a PR containing Cassandra IO with a sink:
>>>>>>>
>>>>>>> https://github.com/apache/incubator-beam/pull/592
>>>>>>>
>>>>>>> Maybe it can help you.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>>
>>>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>>>>>>>
>>>>>>> Hi Kenneth
>>>>>>>>
>>>>>>>> Thanks for looking into it. I am currently trying to implement
>>>>>>>>
>>>>>>> Sinks
>>>
>>>> for
>>>>>
>>>>>> writing data into Cassandra/Titan DB.  My immediate goal is to run
>>>>>>>>
>>>>>>> it
>>>>
>>>>> on
>>>>>
>>>>>> Flink Runner.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> Sumit Chawla
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
>>>>>>>>
>>>>>>> <klk@google.com.invalid
>>>>>>
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Sumit,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I see what has happened here, from that snippet you pasted from
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner's code [1]. Thanks for looking into it!
>>>>>>>>>
>>>>>>>>> The Flink runner today appears to reject Write.Bounded transforms
>>>>>>>>>
>>>>>>>> in
>>>>
>>>>> streaming mode if the sink is not an instance of
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink.
>>>>
>>>>> The
>>>>>>
>>>>>>> intent of that code, I believe, was to special case
>>>>>>>>>
>>>>>>>> UnboundedFlinkSink
>>>>>
>>>>>> to
>>>>>>
>>>>>>> make it easy to use an existing Flink sink, not to disable all
>>>>>>>>>
>>>>>>>> other
>>>>
>>>>> Write
>>>>>>>>> transforms. What do you think, Max?
>>>>>>>>>
>>>>>>>>> Until we fix this issue, you should use ParDo transforms to do
>>>>>>>>>
>>>>>>>> the
>>>
>>>> writing.
>>>>>>>>> If you can share a little about your sink, we may be able to
>>>>>>>>>
>>>>>>>> suggest
>>>>
>>>>> patterns for implementing it. Like Eugene said, the
>>>>>>>>>
>>>>>>>> Write.of(Sink)
>>>
>>>> transform is just a specialized pattern of ParDo's, not a Beam
>>>>>>>>>
>>>>>>>> primitive.
>>>>>>
>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific to
>>>>>>>>>
>>>>>>>> the
>>>
>>>> Flink
>>>>>>
>>>>>>> runner, and I'll then defer to somebody familiar with it.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
>>>>>>>>>>
>>>>>>>>> sumitkchawla@gmail.com>
>>>>>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks a lot Eugene.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>>>>>>>>>>
>>>>>>>>>> Which
>>>>
>>>>>
>>>>>>>>>>>>>> mandates that my implementation must also implement
>>>>>>>>>>>>>
>>>>>>>>>>>> SinkFunction<>.
>>>>>
>>>>>> In
>>>>>>>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>>>>>>>
>>>>>>>>>>> I am using FlinkRunner. The Sink implementation that i was
>>>>>>>>>>>
>>>>>>>>>> writing
>>>>
>>>>> by
>>>>>
>>>>>> extending Sink<> class had to implement Flink Specific
>>>>>>>>>>>
>>>>>>>>>> SinkFunction
>>>>
>>>>> for
>>>>>>
>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> correct translation.
>>>>>>>>>>>
>>>>>>>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>
>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    @Override
>>>>>>>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>>>>>>>> FlinkStreamingTranslationContext context) {
>>>>>>>>>>>      String name = transform.getName();
>>>>>>>>>>>      PValue input = context.getInput(transform);
>>>>>>>>>>>
>>>>>>>>>>>      Sink<T> sink = transform.getSink();
>>>>>>>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>>>>>>>        throw new UnsupportedOperationException("At the time,
>>>>>>>>>>>
>>>>>>>>>> only
>>>
>>>> unbounded Flink sinks are supported.");
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>>>>>>>> context.getInputDataStream(input);
>>>>>>>>>>>
>>>>>>>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>>>>>>>
>>>>>>>>>>> Object>()
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>        @Override
>>>>>>>>>>>        public void flatMap(WindowedValue<T> value,
>>>>>>>>>>>
>>>>>>>>>> Collector<Object>
>>>>>
>>>>>> out) throws Exception {
>>>>>>>>>>>          out.collect(value.getValue());
>>>>>>>>>>>        }
>>>>>>>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>>>>>>>> sink).getFlinkSource()).name(name);
>>>>>>>>>>>    }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Sumit,
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> All reusable parts of a pipeline, including connectors to
>>>>>>>>>>>>
>>>>>>>>>>> storage
>>>>
>>>>>
>>>>>>>>>>>> systems,
>>>>>>>>>>>
>>>>>>>>>>> should be packaged as PTransform's.
>>>>>>>>>>>>
>>>>>>>>>>>> Sink is an advanced API that you can use under the hood to
>>>>>>>>>>>>
>>>>>>>>>>> implement
>>>>>
>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> transform, if this particular connector benefits from this API
>>>>>>>>>>>
>>>>>>>>>> -
>>>
>>>> but
>>>>>
>>>>>>
>>>>>>>>>>>> you
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> don't have to, and many connectors indeed don't need it, and
>>>>>>>>>>>
>>>>>>>>>> are
>>>
>>>>
>>>>>>>>>>>> simpler
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>> implement just as wrappers around a couple of ParDo's writing
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>>
>>>>>
>>>>>>>>>>>> data.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Even if the connector is implemented using a Sink, packaging
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>
>>>>
>>>>>>>>>>>> connector
>>>>>>>>>>>
>>>>>>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> because it's more future-proof (the author of the connector
>>>>>>>>>>>>
>>>>>>>>>>> may
>>>
>>>> later
>>>>>>
>>>>>>> change it to use something else rather than Sink under the
>>>>>>>>>>>>
>>>>>>>>>>> hood
>>>
>>>>
>>>>>>>>>>>> without
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> breaking existing users).
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Sink is, currently, useful in the following case:
>>>>>>>>>>>> - You're writing a bounded amount of data (we do not yet have
>>>>>>>>>>>>
>>>>>>>>>>> an
>>>
>>>>
>>>>>>>>>>>> unbounded
>>>>>>>>>>>
>>>>>>>>>>> Sink analogue)
>>>>>>>>>>>> - The location you're writing to is known at pipeline
>>>>>>>>>>>>
>>>>>>>>>>> construction
>>>>
>>>>>
>>>>>>>>>>>> time,
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and does not depend on the data itself (support for
>>>>>>>>>>>
>>>>>>>>>> "data-dependent"
>>>>>
>>>>>>
>>>>>>>>>>>> sinks
>>>>>>>>>>>
>>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92
>>>>>>>>>>>>
>>>>>>>>>>> )
>>>
>>>> - The storage system you're writing to has a distinct
>>>>>>>>>>>>
>>>>>>>>>>>> "initialization"
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> "finalization" step, allowing the write operation to appear
>>>>>>>>>>>>
>>>>>>>>>>> atomic
>>>>
>>>>>
>>>>>>>>>>>> (either
>>>>>>>>>>>
>>>>>>>>>>> all data is written or none). This mostly applies to files
>>>>>>>>>>>>
>>>>>>>>>>> (where
>>>>
>>>>>
>>>>>>>>>>>> writing
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> is done by first writing to a temporary directory, and then
>>>>>>>>>>>
>>>>>>>>>> renaming
>>>>>
>>>>>>
>>>>>>>>>>>> all
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> files to their final location), but there can be other cases
>>>>>>>>>>>
>>>>>>>>>> too.
>>>
>>>>
>>>>>>>>>>>> Here's an example GCP connector using the Sink API under the
>>>>>>>>>>>>
>>>>>>>>>>> hood:
>>>>
>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>
>>>>
>>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> DatastoreIO,
>>>>>>>>>>>
>>>>>>>>>>> BigtableIO etc.)
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not familiar with the Flink API, however I'm a bit
>>>>>>>>>>>>
>>>>>>>>>>> confused
>>>
>>>> by
>>>>
>>>>>
>>>>>>>>>>>> your
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> last paragraph: the Beam programming model is intentionally
>>>>>>>>>>
>>>>>>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>>>>>>>
>>>>>>>>>>>> different
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> runners.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
>>>>>>>>>>>>
>>>>>>>>>>> sumitkchawla@gmail.com
>>>>>>
>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam.  I
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> see
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> that there is a Sink<T> abstract class which is in
>>>>>>>>>>>>
>>>>>>>>>>> experimental
>>>
>>>>
>>>>>>>>>>>>> state.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> frozen,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>> or
>>>>>>>>>>
>>>>>>>>>> this could still change?  Most of the existing Sink
>>>>>>>>>>>
>>>>>>>>>> implementations
>>>>
>>>>>
>>>>>>>>>>>>> like
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> KafkaIO.Write are not using this interface, and instead
>>>>>>>>>>>>
>>>>>>>>>>> extends
>>>
>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
>>>>>>>>>>>>>
>>>>>>>>>>>> changed
>>>>
>>>>> to
>>>>>
>>>>>>
>>>>>>>>>>>>> extend
>>>>>>>>>>>>
>>>>>>>>>>>> Sink<>.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>>>>>>>>>>>>
>>>>>>>>>>>> Which
>>>>>
>>>>>>
>>>>>>>>>>>>> mandates
>>>>>>>>>>>>
>>>>>>>>>>>> that my implementation must also implement SinkFunction<>.
>>>>>>>>>>>>>
>>>>>>>>>>>> In
>>>
>>>> that
>>>>>
>>>>>>
>>>>>>>>>>>>> case,
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Suggestion for Writing Sink Implementation

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Sumit,

Not sure I follow you.

Which resource cleanup are you talking about:
- the close() on the reader (source) ?
- the finishBundle() on the writer (sink) ?

Regards
JB

On 07/29/2016 09:35 PM, Chawla,Sumit  wrote:
> Hi Raghu
>
> My source is going to be unbounded (streaming) with writes to Cassandra.
> Only concern with KafkaIO. write is that producer is closed after every
> bundle, and every bundle may have to open a new connection to Kafka.  (
> Please correct me if i am wrong: I am assuming the bundle to be equivalent
> to Window Size\Mini-batch).
>
> In Jean's implementation i see a different style of resource cleanup. Can
> someone please explain when that finalize method is called?
>
> Regards
> Sumit Chawla
>
>
> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <ra...@google.com.invalid>
> wrote:
>
>> It is the preferred pattern I think. Is your source bounded or unbounded
>> (i.e. streaming)? If it is latter, your sink could even be simpler than
>> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
>> processElement().
>>
>> The pros are pretty clear : runner independent, pure Beam, simpler code.
>> cons : no checkpoint/rollback, I don't know if Flink specific sink provides
>> this either.
>>
>> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
>> wrote:
>>
>>> Any more comments on this pattern suggested by Jean?
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <klk@google.com.invalid
>>>
>>> wrote:
>>>
>>>> What I said earlier is not quite accurate, though my advice is the
>> same.
>>>> Here are the corrections:
>>>>
>>>>  - The Write transform actually has a too-general name, and
>>> Write.of(Sink)
>>>> only really works for finite data. It re-windows into the global window
>>> and
>>>> replaces any triggers.
>>>>  - So the special case in the Flink runner actually just _enables_ a
>>> (fake)
>>>> Sink to work.
>>>>
>>>> We should probably rename Write to some more specific name that
>> indicates
>>>> the particular strategy, and make it easier for a user to decide
>> whether
>>>> that pattern is what they want. And the transform as-is should probably
>>>> reject unbounded inputs.
>>>>
>>>> So you should still proceed with implementation via ParDo and your own
>>>> logic. If you want some logic similar to Write (but with different
>>>> windowing and triggering) then it is a pretty simple composite to
>> derive
>>>> something from.
>>>>
>>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkchawla@gmail.com
>>>
>>>> wrote:
>>>>
>>>>> Thanks Jean
>>>>>
>>>>> This is an interesting pattern here.  I see that its implemented as
>>>>> PTransform, with constructs ( WriteOperation/Writer)  pretty similar
>> to
>>>>> Sink<T> interface.  Would love to hear more pros/cons of this pattern
>>> :)
>>>> .
>>>>> Definitely it gives more control over connection initialization and
>>>>> cleanup.
>>>>>
>>>>> Regards
>>>>> Sumit Chawla
>>>>>
>>>>>
>>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofr� <
>>> jb@nanthrax.net>
>>>>> wrote:
>>>>>
>>>>>> Hi Sumit,
>>>>>>
>>>>>> I created a PR containing Cassandra IO with a sink:
>>>>>>
>>>>>> https://github.com/apache/incubator-beam/pull/592
>>>>>>
>>>>>> Maybe it can help you.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>>
>>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>>>>>>
>>>>>>> Hi Kenneth
>>>>>>>
>>>>>>> Thanks for looking into it. I am currently trying to implement
>> Sinks
>>>> for
>>>>>>> writing data into Cassandra/Titan DB.  My immediate goal is to run
>>> it
>>>> on
>>>>>>> Flink Runner.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards
>>>>>>> Sumit Chawla
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
>>>>> <klk@google.com.invalid
>>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Sumit,
>>>>>>>>
>>>>>>>> I see what has happened here, from that snippet you pasted from
>> the
>>>>> Flink
>>>>>>>> runner's code [1]. Thanks for looking into it!
>>>>>>>>
>>>>>>>> The Flink runner today appears to reject Write.Bounded transforms
>>> in
>>>>>>>> streaming mode if the sink is not an instance of
>>> UnboundedFlinkSink.
>>>>> The
>>>>>>>> intent of that code, I believe, was to special case
>>>> UnboundedFlinkSink
>>>>> to
>>>>>>>> make it easy to use an existing Flink sink, not to disable all
>>> other
>>>>>>>> Write
>>>>>>>> transforms. What do you think, Max?
>>>>>>>>
>>>>>>>> Until we fix this issue, you should use ParDo transforms to do
>> the
>>>>>>>> writing.
>>>>>>>> If you can share a little about your sink, we may be able to
>>> suggest
>>>>>>>> patterns for implementing it. Like Eugene said, the
>> Write.of(Sink)
>>>>>>>> transform is just a specialized pattern of ParDo's, not a Beam
>>>>> primitive.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>
>>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific to
>> the
>>>>> Flink
>>>>>>>>> runner, and I'll then defer to somebody familiar with it.
>>>>>>>>>
>>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
>>>> sumitkchawla@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks a lot Eugene.
>>>>>>>>>>
>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>> Which
>>>>>>>>>>>>>
>>>>>>>>>>>> mandates that my implementation must also implement
>>>> SinkFunction<>.
>>>>>>>>>> In
>>>>>>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>>>>>>
>>>>>>>>>> I am using FlinkRunner. The Sink implementation that i was
>>> writing
>>>> by
>>>>>>>>>> extending Sink<> class had to implement Flink Specific
>>> SinkFunction
>>>>> for
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> correct translation.
>>>>>>>>>>
>>>>>>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>>>>>>
>>>>>>>>> {
>>>>>>>>>>
>>>>>>>>>>    @Override
>>>>>>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>>>>>>> FlinkStreamingTranslationContext context) {
>>>>>>>>>>      String name = transform.getName();
>>>>>>>>>>      PValue input = context.getInput(transform);
>>>>>>>>>>
>>>>>>>>>>      Sink<T> sink = transform.getSink();
>>>>>>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>>>>>>        throw new UnsupportedOperationException("At the time,
>> only
>>>>>>>>>> unbounded Flink sinks are supported.");
>>>>>>>>>>      }
>>>>>>>>>>
>>>>>>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>>>>>>> context.getInputDataStream(input);
>>>>>>>>>>
>>>>>>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>>>>>>
>>>>>>>>> Object>()
>>>>>>>>
>>>>>>>>> {
>>>>>>>>>
>>>>>>>>>>        @Override
>>>>>>>>>>        public void flatMap(WindowedValue<T> value,
>>>> Collector<Object>
>>>>>>>>>> out) throws Exception {
>>>>>>>>>>          out.collect(value.getValue());
>>>>>>>>>>        }
>>>>>>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>>>>>>> sink).getFlinkSource()).name(name);
>>>>>>>>>>    }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> Sumit Chawla
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Sumit,
>>>>>>>>>>>
>>>>>>>>>>> All reusable parts of a pipeline, including connectors to
>>> storage
>>>>>>>>>>>
>>>>>>>>>> systems,
>>>>>>>>>>
>>>>>>>>>>> should be packaged as PTransform's.
>>>>>>>>>>>
>>>>>>>>>>> Sink is an advanced API that you can use under the hood to
>>>> implement
>>>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> transform, if this particular connector benefits from this API
>> -
>>>> but
>>>>>>>>>>>
>>>>>>>>>> you
>>>>>>>>>
>>>>>>>>>> don't have to, and many connectors indeed don't need it, and
>> are
>>>>>>>>>>>
>>>>>>>>>> simpler
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> implement just as wrappers around a couple of ParDo's writing
>>> the
>>>>>>>>>>>
>>>>>>>>>> data.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> Even if the connector is implemented using a Sink, packaging
>> the
>>>>>>>>>>>
>>>>>>>>>> connector
>>>>>>>>>>
>>>>>>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>>>>>>
>>>>>>>>>> pipeline
>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>> because it's more future-proof (the author of the connector
>> may
>>>>> later
>>>>>>>>>>> change it to use something else rather than Sink under the
>> hood
>>>>>>>>>>>
>>>>>>>>>> without
>>>>>>>>
>>>>>>>>> breaking existing users).
>>>>>>>>>>>
>>>>>>>>>>> Sink is, currently, useful in the following case:
>>>>>>>>>>> - You're writing a bounded amount of data (we do not yet have
>> an
>>>>>>>>>>>
>>>>>>>>>> unbounded
>>>>>>>>>>
>>>>>>>>>>> Sink analogue)
>>>>>>>>>>> - The location you're writing to is known at pipeline
>>> construction
>>>>>>>>>>>
>>>>>>>>>> time,
>>>>>>>>>
>>>>>>>>>> and does not depend on the data itself (support for
>>>> "data-dependent"
>>>>>>>>>>>
>>>>>>>>>> sinks
>>>>>>>>>>
>>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92
>> )
>>>>>>>>>>> - The storage system you're writing to has a distinct
>>>>>>>>>>>
>>>>>>>>>> "initialization"
>>>>>>>>
>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>> "finalization" step, allowing the write operation to appear
>>> atomic
>>>>>>>>>>>
>>>>>>>>>> (either
>>>>>>>>>>
>>>>>>>>>>> all data is written or none). This mostly applies to files
>>> (where
>>>>>>>>>>>
>>>>>>>>>> writing
>>>>>>>>>
>>>>>>>>>> is done by first writing to a temporary directory, and then
>>>> renaming
>>>>>>>>>>>
>>>>>>>>>> all
>>>>>>>>>
>>>>>>>>>> files to their final location), but there can be other cases
>> too.
>>>>>>>>>>>
>>>>>>>>>>> Here's an example GCP connector using the Sink API under the
>>> hood:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>>>>>>
>>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>>>>>>
>>>>>>>>>> DatastoreIO,
>>>>>>>>>>
>>>>>>>>>>> BigtableIO etc.)
>>>>>>>>>>>
>>>>>>>>>>> I'm not familiar with the Flink API, however I'm a bit
>> confused
>>> by
>>>>>>>>>>>
>>>>>>>>>> your
>>>>>>>>
>>>>>>>>> last paragraph: the Beam programming model is intentionally
>>>>>>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>>>>>>
>>>>>>>>>> different
>>>>>>>>
>>>>>>>>> runners.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
>>>>> sumitkchawla@gmail.com
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>>>>>>
>>>>>>>>>>> Beam.  I
>>>>>>>>
>>>>>>>>> see
>>>>>>>>>>
>>>>>>>>>>> that there is a Sink<T> abstract class which is in
>> experimental
>>>>>>>>>>>>
>>>>>>>>>>> state.
>>>>>>>>>
>>>>>>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>>>>>>
>>>>>>>>>>> frozen,
>>>>>>>>
>>>>>>>>> or
>>>>>>>>>
>>>>>>>>>> this could still change?  Most of the existing Sink
>>> implementations
>>>>>>>>>>>>
>>>>>>>>>>> like
>>>>>>>>>>
>>>>>>>>>>> KafkaIO.Write are not using this interface, and instead
>> extends
>>>>>>>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
>>> changed
>>>> to
>>>>>>>>>>>>
>>>>>>>>>>> extend
>>>>>>>>>>>
>>>>>>>>>>>> Sink<>.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>>> Which
>>>>>>>>>>>>
>>>>>>>>>>> mandates
>>>>>>>>>>>
>>>>>>>>>>>> that my implementation must also implement SinkFunction<>.
>> In
>>>> that
>>>>>>>>>>>>
>>>>>>>>>>> case,
>>>>>>>>>>
>>>>>>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofr�
>>>>>> jbonofre@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>
>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Suggestion for Writing Sink Implementation

Posted by Raghu Angadi <ra...@google.com.INVALID>.
KafkaIO: we should simply cache the producer and reuse within a JVM. A
simple approach I plan to implement is to use a cache that expires after a
few minutes of inactivity. We could assign a unique id per sink which it is
creates so that we would the right kafka producer configuration even when
there are multiple kafka sinks with different configurations. Same approach
would work for you too. Not sure how you are currently avoiding this with
flink sink.

I think JB's sink is meant for bounded pipeline, where sideInput trick
ensures finalize() is called once at the end of a work_item/split.

On Fri, Jul 29, 2016 at 12:35 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Hi Raghu
>
> My source is going to be unbounded (streaming) with writes to Cassandra.
> Only concern with KafkaIO. write is that producer is closed after every
> bundle, and every bundle may have to open a new connection to Kafka.  (
> Please correct me if i am wrong: I am assuming the bundle to be equivalent
> to Window Size\Mini-batch).
>
> In Jean's implementation i see a different style of resource cleanup. Can
> someone please explain when that finalize method is called?
>
> Regards
> Sumit Chawla
>
>
> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <rangadi@google.com.invalid
> >
> wrote:
>
> > It is the preferred pattern I think. Is your source bounded or unbounded
> > (i.e. streaming)? If it is latter, your sink could even be simpler than
> > JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
> > processElement().
> >
> > The pros are pretty clear : runner independent, pure Beam, simpler code.
> > cons : no checkpoint/rollback, I don't know if Flink specific sink
> provides
> > this either.
> >
> > On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Any more comments on this pattern suggested by Jean?
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles
> <klk@google.com.invalid
> > >
> > > wrote:
> > >
> > > > What I said earlier is not quite accurate, though my advice is the
> > same.
> > > > Here are the corrections:
> > > >
> > > >  - The Write transform actually has a too-general name, and
> > > Write.of(Sink)
> > > > only really works for finite data. It re-windows into the global
> window
> > > and
> > > > replaces any triggers.
> > > >  - So the special case in the Flink runner actually just _enables_ a
> > > (fake)
> > > > Sink to work.
> > > >
> > > > We should probably rename Write to some more specific name that
> > indicates
> > > > the particular strategy, and make it easier for a user to decide
> > whether
> > > > that pattern is what they want. And the transform as-is should
> probably
> > > > reject unbounded inputs.
> > > >
> > > > So you should still proceed with implementation via ParDo and your
> own
> > > > logic. If you want some logic similar to Write (but with different
> > > > windowing and triggering) then it is a pretty simple composite to
> > derive
> > > > something from.
> > > >
> > > > On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <
> sumitkchawla@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thanks Jean
> > > > >
> > > > > This is an interesting pattern here.  I see that its implemented as
> > > > > PTransform, with constructs ( WriteOperation/Writer)  pretty
> similar
> > to
> > > > > Sink<T> interface.  Would love to hear more pros/cons of this
> pattern
> > > :)
> > > > .
> > > > > Definitely it gives more control over connection initialization and
> > > > > cleanup.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > > >
> > > > > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
> > > jb@nanthrax.net>
> > > > > wrote:
> > > > >
> > > > > > Hi Sumit,
> > > > > >
> > > > > > I created a PR containing Cassandra IO with a sink:
> > > > > >
> > > > > > https://github.com/apache/incubator-beam/pull/592
> > > > > >
> > > > > > Maybe it can help you.
> > > > > >
> > > > > > Regards
> > > > > > JB
> > > > > >
> > > > > >
> > > > > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > > > > >
> > > > > >> Hi Kenneth
> > > > > >>
> > > > > >> Thanks for looking into it. I am currently trying to implement
> > Sinks
> > > > for
> > > > > >> writing data into Cassandra/Titan DB.  My immediate goal is to
> run
> > > it
> > > > on
> > > > > >> Flink Runner.
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> Regards
> > > > > >> Sumit Chawla
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > > > > <klk@google.com.invalid
> > > > > >> >
> > > > > >> wrote:
> > > > > >>
> > > > > >> Hi Sumit,
> > > > > >>>
> > > > > >>> I see what has happened here, from that snippet you pasted from
> > the
> > > > > Flink
> > > > > >>> runner's code [1]. Thanks for looking into it!
> > > > > >>>
> > > > > >>> The Flink runner today appears to reject Write.Bounded
> transforms
> > > in
> > > > > >>> streaming mode if the sink is not an instance of
> > > UnboundedFlinkSink.
> > > > > The
> > > > > >>> intent of that code, I believe, was to special case
> > > > UnboundedFlinkSink
> > > > > to
> > > > > >>> make it easy to use an existing Flink sink, not to disable all
> > > other
> > > > > >>> Write
> > > > > >>> transforms. What do you think, Max?
> > > > > >>>
> > > > > >>> Until we fix this issue, you should use ParDo transforms to do
> > the
> > > > > >>> writing.
> > > > > >>> If you can share a little about your sink, we may be able to
> > > suggest
> > > > > >>> patterns for implementing it. Like Eugene said, the
> > Write.of(Sink)
> > > > > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > > > > primitive.
> > > > > >>>
> > > > > >>> Kenn
> > > > > >>>
> > > > > >>> [1]
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > > > > >>>
> > > > > >>>
> > > > > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > > > > >>> kirpichov@google.com.invalid> wrote:
> > > > > >>>
> > > > > >>> Thanks Sumit. Looks like your question is, indeed, specific to
> > the
> > > > > Flink
> > > > > >>>> runner, and I'll then defer to somebody familiar with it.
> > > > > >>>>
> > > > > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> > > > sumitkchawla@gmail.com>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Thanks a lot Eugene.
> > > > > >>>>>
> > > > > >>>>> My immediate requirement is to run this Sink on FlinkRunner.
> > > Which
> > > > > >>>>>>>>
> > > > > >>>>>>> mandates that my implementation must also implement
> > > > SinkFunction<>.
> > > > > >>>>> In
> > > > > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > > > > >>>>>
> > > > > >>>>> I am using FlinkRunner. The Sink implementation that i was
> > > writing
> > > > by
> > > > > >>>>> extending Sink<> class had to implement Flink Specific
> > > SinkFunction
> > > > > for
> > > > > >>>>>
> > > > > >>>> the
> > > > > >>>>
> > > > > >>>>> correct translation.
> > > > > >>>>>
> > > > > >>>>> private static class WriteSinkStreamingTranslator<T>
> implements
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > > > >>>
> > > > > >>>> {
> > > > > >>>>>
> > > > > >>>>>    @Override
> > > > > >>>>>    public void translateNode(Write.Bound<T> transform,
> > > > > >>>>> FlinkStreamingTranslationContext context) {
> > > > > >>>>>      String name = transform.getName();
> > > > > >>>>>      PValue input = context.getInput(transform);
> > > > > >>>>>
> > > > > >>>>>      Sink<T> sink = transform.getSink();
> > > > > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > > > > >>>>>        throw new UnsupportedOperationException("At the time,
> > only
> > > > > >>>>> unbounded Flink sinks are supported.");
> > > > > >>>>>      }
> > > > > >>>>>
> > > > > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > > > > >>>>> context.getInputDataStream(input);
> > > > > >>>>>
> > > > > >>>>>      inputDataSet.flatMap(new
> FlatMapFunction<WindowedValue<T>,
> > > > > >>>>>
> > > > > >>>> Object>()
> > > > > >>>
> > > > > >>>> {
> > > > > >>>>
> > > > > >>>>>        @Override
> > > > > >>>>>        public void flatMap(WindowedValue<T> value,
> > > > Collector<Object>
> > > > > >>>>> out) throws Exception {
> > > > > >>>>>          out.collect(value.getValue());
> > > > > >>>>>        }
> > > > > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > > > > >>>>> sink).getFlinkSource()).name(name);
> > > > > >>>>>    }
> > > > > >>>>> }
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Regards
> > > > > >>>>> Sumit Chawla
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > > > >>>>> kirpichov@google.com.invalid> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Sumit,
> > > > > >>>>>>
> > > > > >>>>>> All reusable parts of a pipeline, including connectors to
> > > storage
> > > > > >>>>>>
> > > > > >>>>> systems,
> > > > > >>>>>
> > > > > >>>>>> should be packaged as PTransform's.
> > > > > >>>>>>
> > > > > >>>>>> Sink is an advanced API that you can use under the hood to
> > > > implement
> > > > > >>>>>>
> > > > > >>>>> the
> > > > > >>>>
> > > > > >>>>> transform, if this particular connector benefits from this
> API
> > -
> > > > but
> > > > > >>>>>>
> > > > > >>>>> you
> > > > > >>>>
> > > > > >>>>> don't have to, and many connectors indeed don't need it, and
> > are
> > > > > >>>>>>
> > > > > >>>>> simpler
> > > > > >>>>
> > > > > >>>>> to
> > > > > >>>>>
> > > > > >>>>>> implement just as wrappers around a couple of ParDo's
> writing
> > > the
> > > > > >>>>>>
> > > > > >>>>> data.
> > > > > >>>
> > > > > >>>>
> > > > > >>>>>> Even if the connector is implemented using a Sink, packaging
> > the
> > > > > >>>>>>
> > > > > >>>>> connector
> > > > > >>>>>
> > > > > >>>>>> as a PTransform is important because it's easier to apply
> in a
> > > > > >>>>>>
> > > > > >>>>> pipeline
> > > > > >>>
> > > > > >>>> and
> > > > > >>>>>
> > > > > >>>>>> because it's more future-proof (the author of the connector
> > may
> > > > > later
> > > > > >>>>>> change it to use something else rather than Sink under the
> > hood
> > > > > >>>>>>
> > > > > >>>>> without
> > > > > >>>
> > > > > >>>> breaking existing users).
> > > > > >>>>>>
> > > > > >>>>>> Sink is, currently, useful in the following case:
> > > > > >>>>>> - You're writing a bounded amount of data (we do not yet
> have
> > an
> > > > > >>>>>>
> > > > > >>>>> unbounded
> > > > > >>>>>
> > > > > >>>>>> Sink analogue)
> > > > > >>>>>> - The location you're writing to is known at pipeline
> > > construction
> > > > > >>>>>>
> > > > > >>>>> time,
> > > > > >>>>
> > > > > >>>>> and does not depend on the data itself (support for
> > > > "data-dependent"
> > > > > >>>>>>
> > > > > >>>>> sinks
> > > > > >>>>>
> > > > > >>>>>> is on the radar
> https://issues.apache.org/jira/browse/BEAM-92
> > )
> > > > > >>>>>> - The storage system you're writing to has a distinct
> > > > > >>>>>>
> > > > > >>>>> "initialization"
> > > > > >>>
> > > > > >>>> and
> > > > > >>>>>
> > > > > >>>>>> "finalization" step, allowing the write operation to appear
> > > atomic
> > > > > >>>>>>
> > > > > >>>>> (either
> > > > > >>>>>
> > > > > >>>>>> all data is written or none). This mostly applies to files
> > > (where
> > > > > >>>>>>
> > > > > >>>>> writing
> > > > > >>>>
> > > > > >>>>> is done by first writing to a temporary directory, and then
> > > > renaming
> > > > > >>>>>>
> > > > > >>>>> all
> > > > > >>>>
> > > > > >>>>> files to their final location), but there can be other cases
> > too.
> > > > > >>>>>>
> > > > > >>>>>> Here's an example GCP connector using the Sink API under the
> > > hood:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > > > >>>
> > > > > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > > > >>>>>>
> > > > > >>>>> DatastoreIO,
> > > > > >>>>>
> > > > > >>>>>> BigtableIO etc.)
> > > > > >>>>>>
> > > > > >>>>>> I'm not familiar with the Flink API, however I'm a bit
> > confused
> > > by
> > > > > >>>>>>
> > > > > >>>>> your
> > > > > >>>
> > > > > >>>> last paragraph: the Beam programming model is intentionally
> > > > > >>>>>> runner-agnostic, so that you can run exactly the same code
> on
> > > > > >>>>>>
> > > > > >>>>> different
> > > > > >>>
> > > > > >>>> runners.
> > > > > >>>>>>
> > > > > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > > > > sumitkchawla@gmail.com
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>> Hi
> > > > > >>>>>>>
> > > > > >>>>>>> Please suggest me on what is the best way to write a Sink
> in
> > > > > >>>>>>>
> > > > > >>>>>> Beam.  I
> > > > > >>>
> > > > > >>>> see
> > > > > >>>>>
> > > > > >>>>>> that there is a Sink<T> abstract class which is in
> > experimental
> > > > > >>>>>>>
> > > > > >>>>>> state.
> > > > > >>>>
> > > > > >>>>> What is the expected outcome of this one? Do we have the api
> > > > > >>>>>>>
> > > > > >>>>>> frozen,
> > > > > >>>
> > > > > >>>> or
> > > > > >>>>
> > > > > >>>>> this could still change?  Most of the existing Sink
> > > implementations
> > > > > >>>>>>>
> > > > > >>>>>> like
> > > > > >>>>>
> > > > > >>>>>> KafkaIO.Write are not using this interface, and instead
> > extends
> > > > > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
> > > changed
> > > > to
> > > > > >>>>>>>
> > > > > >>>>>> extend
> > > > > >>>>>>
> > > > > >>>>>>> Sink<>.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> My immediate requirement is to run this Sink on
> FlinkRunner.
> > > > Which
> > > > > >>>>>>>
> > > > > >>>>>> mandates
> > > > > >>>>>>
> > > > > >>>>>>> that my implementation must also implement SinkFunction<>.
> > In
> > > > that
> > > > > >>>>>>>
> > > > > >>>>>> case,
> > > > > >>>>>
> > > > > >>>>>> none of the Sink<> methods get called anyway.
> > > > > >>>>>>>
> > > > > >>>>>>> Regards
> > > > > >>>>>>> Sumit Chawla
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > > --
> > > > > > Jean-Baptiste Onofré
> > > > > > jbonofre@apache.org
> > > > > > http://blog.nanthrax.net
> > > > > > Talend - http://www.talend.com
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Raghu

My source is going to be unbounded (streaming) with writes to Cassandra.
Only concern with KafkaIO. write is that producer is closed after every
bundle, and every bundle may have to open a new connection to Kafka.  (
Please correct me if i am wrong: I am assuming the bundle to be equivalent
to Window Size\Mini-batch).

In Jean's implementation i see a different style of resource cleanup. Can
someone please explain when that finalize method is called?

Regards
Sumit Chawla


On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <ra...@google.com.invalid>
wrote:

> It is the preferred pattern I think. Is your source bounded or unbounded
> (i.e. streaming)? If it is latter, your sink could even be simpler than
> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
> processElement().
>
> The pros are pretty clear : runner independent, pure Beam, simpler code.
> cons : no checkpoint/rollback, I don't know if Flink specific sink provides
> this either.
>
> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Any more comments on this pattern suggested by Jean?
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <klk@google.com.invalid
> >
> > wrote:
> >
> > > What I said earlier is not quite accurate, though my advice is the
> same.
> > > Here are the corrections:
> > >
> > >  - The Write transform actually has a too-general name, and
> > Write.of(Sink)
> > > only really works for finite data. It re-windows into the global window
> > and
> > > replaces any triggers.
> > >  - So the special case in the Flink runner actually just _enables_ a
> > (fake)
> > > Sink to work.
> > >
> > > We should probably rename Write to some more specific name that
> indicates
> > > the particular strategy, and make it easier for a user to decide
> whether
> > > that pattern is what they want. And the transform as-is should probably
> > > reject unbounded inputs.
> > >
> > > So you should still proceed with implementation via ParDo and your own
> > > logic. If you want some logic similar to Write (but with different
> > > windowing and triggering) then it is a pretty simple composite to
> derive
> > > something from.
> > >
> > > On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <sumitkchawla@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks Jean
> > > >
> > > > This is an interesting pattern here.  I see that its implemented as
> > > > PTransform, with constructs ( WriteOperation/Writer)  pretty similar
> to
> > > > Sink<T> interface.  Would love to hear more pros/cons of this pattern
> > :)
> > > .
> > > > Definitely it gives more control over connection initialization and
> > > > cleanup.
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
> > jb@nanthrax.net>
> > > > wrote:
> > > >
> > > > > Hi Sumit,
> > > > >
> > > > > I created a PR containing Cassandra IO with a sink:
> > > > >
> > > > > https://github.com/apache/incubator-beam/pull/592
> > > > >
> > > > > Maybe it can help you.
> > > > >
> > > > > Regards
> > > > > JB
> > > > >
> > > > >
> > > > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > > > >
> > > > >> Hi Kenneth
> > > > >>
> > > > >> Thanks for looking into it. I am currently trying to implement
> Sinks
> > > for
> > > > >> writing data into Cassandra/Titan DB.  My immediate goal is to run
> > it
> > > on
> > > > >> Flink Runner.
> > > > >>
> > > > >>
> > > > >>
> > > > >> Regards
> > > > >> Sumit Chawla
> > > > >>
> > > > >>
> > > > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > > > <klk@google.com.invalid
> > > > >> >
> > > > >> wrote:
> > > > >>
> > > > >> Hi Sumit,
> > > > >>>
> > > > >>> I see what has happened here, from that snippet you pasted from
> the
> > > > Flink
> > > > >>> runner's code [1]. Thanks for looking into it!
> > > > >>>
> > > > >>> The Flink runner today appears to reject Write.Bounded transforms
> > in
> > > > >>> streaming mode if the sink is not an instance of
> > UnboundedFlinkSink.
> > > > The
> > > > >>> intent of that code, I believe, was to special case
> > > UnboundedFlinkSink
> > > > to
> > > > >>> make it easy to use an existing Flink sink, not to disable all
> > other
> > > > >>> Write
> > > > >>> transforms. What do you think, Max?
> > > > >>>
> > > > >>> Until we fix this issue, you should use ParDo transforms to do
> the
> > > > >>> writing.
> > > > >>> If you can share a little about your sink, we may be able to
> > suggest
> > > > >>> patterns for implementing it. Like Eugene said, the
> Write.of(Sink)
> > > > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > > > primitive.
> > > > >>>
> > > > >>> Kenn
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>>
> > > > >>>
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > > > >>>
> > > > >>>
> > > > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > > > >>> kirpichov@google.com.invalid> wrote:
> > > > >>>
> > > > >>> Thanks Sumit. Looks like your question is, indeed, specific to
> the
> > > > Flink
> > > > >>>> runner, and I'll then defer to somebody familiar with it.
> > > > >>>>
> > > > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> > > sumitkchawla@gmail.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Thanks a lot Eugene.
> > > > >>>>>
> > > > >>>>> My immediate requirement is to run this Sink on FlinkRunner.
> > Which
> > > > >>>>>>>>
> > > > >>>>>>> mandates that my implementation must also implement
> > > SinkFunction<>.
> > > > >>>>> In
> > > > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > > > >>>>>
> > > > >>>>> I am using FlinkRunner. The Sink implementation that i was
> > writing
> > > by
> > > > >>>>> extending Sink<> class had to implement Flink Specific
> > SinkFunction
> > > > for
> > > > >>>>>
> > > > >>>> the
> > > > >>>>
> > > > >>>>> correct translation.
> > > > >>>>>
> > > > >>>>> private static class WriteSinkStreamingTranslator<T> implements
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > > >>>
> > > > >>>> {
> > > > >>>>>
> > > > >>>>>    @Override
> > > > >>>>>    public void translateNode(Write.Bound<T> transform,
> > > > >>>>> FlinkStreamingTranslationContext context) {
> > > > >>>>>      String name = transform.getName();
> > > > >>>>>      PValue input = context.getInput(transform);
> > > > >>>>>
> > > > >>>>>      Sink<T> sink = transform.getSink();
> > > > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > > > >>>>>        throw new UnsupportedOperationException("At the time,
> only
> > > > >>>>> unbounded Flink sinks are supported.");
> > > > >>>>>      }
> > > > >>>>>
> > > > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > > > >>>>> context.getInputDataStream(input);
> > > > >>>>>
> > > > >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> > > > >>>>>
> > > > >>>> Object>()
> > > > >>>
> > > > >>>> {
> > > > >>>>
> > > > >>>>>        @Override
> > > > >>>>>        public void flatMap(WindowedValue<T> value,
> > > Collector<Object>
> > > > >>>>> out) throws Exception {
> > > > >>>>>          out.collect(value.getValue());
> > > > >>>>>        }
> > > > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > > > >>>>> sink).getFlinkSource()).name(name);
> > > > >>>>>    }
> > > > >>>>> }
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Regards
> > > > >>>>> Sumit Chawla
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > > >>>>> kirpichov@google.com.invalid> wrote:
> > > > >>>>>
> > > > >>>>> Hi Sumit,
> > > > >>>>>>
> > > > >>>>>> All reusable parts of a pipeline, including connectors to
> > storage
> > > > >>>>>>
> > > > >>>>> systems,
> > > > >>>>>
> > > > >>>>>> should be packaged as PTransform's.
> > > > >>>>>>
> > > > >>>>>> Sink is an advanced API that you can use under the hood to
> > > implement
> > > > >>>>>>
> > > > >>>>> the
> > > > >>>>
> > > > >>>>> transform, if this particular connector benefits from this API
> -
> > > but
> > > > >>>>>>
> > > > >>>>> you
> > > > >>>>
> > > > >>>>> don't have to, and many connectors indeed don't need it, and
> are
> > > > >>>>>>
> > > > >>>>> simpler
> > > > >>>>
> > > > >>>>> to
> > > > >>>>>
> > > > >>>>>> implement just as wrappers around a couple of ParDo's writing
> > the
> > > > >>>>>>
> > > > >>>>> data.
> > > > >>>
> > > > >>>>
> > > > >>>>>> Even if the connector is implemented using a Sink, packaging
> the
> > > > >>>>>>
> > > > >>>>> connector
> > > > >>>>>
> > > > >>>>>> as a PTransform is important because it's easier to apply in a
> > > > >>>>>>
> > > > >>>>> pipeline
> > > > >>>
> > > > >>>> and
> > > > >>>>>
> > > > >>>>>> because it's more future-proof (the author of the connector
> may
> > > > later
> > > > >>>>>> change it to use something else rather than Sink under the
> hood
> > > > >>>>>>
> > > > >>>>> without
> > > > >>>
> > > > >>>> breaking existing users).
> > > > >>>>>>
> > > > >>>>>> Sink is, currently, useful in the following case:
> > > > >>>>>> - You're writing a bounded amount of data (we do not yet have
> an
> > > > >>>>>>
> > > > >>>>> unbounded
> > > > >>>>>
> > > > >>>>>> Sink analogue)
> > > > >>>>>> - The location you're writing to is known at pipeline
> > construction
> > > > >>>>>>
> > > > >>>>> time,
> > > > >>>>
> > > > >>>>> and does not depend on the data itself (support for
> > > "data-dependent"
> > > > >>>>>>
> > > > >>>>> sinks
> > > > >>>>>
> > > > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92
> )
> > > > >>>>>> - The storage system you're writing to has a distinct
> > > > >>>>>>
> > > > >>>>> "initialization"
> > > > >>>
> > > > >>>> and
> > > > >>>>>
> > > > >>>>>> "finalization" step, allowing the write operation to appear
> > atomic
> > > > >>>>>>
> > > > >>>>> (either
> > > > >>>>>
> > > > >>>>>> all data is written or none). This mostly applies to files
> > (where
> > > > >>>>>>
> > > > >>>>> writing
> > > > >>>>
> > > > >>>>> is done by first writing to a temporary directory, and then
> > > renaming
> > > > >>>>>>
> > > > >>>>> all
> > > > >>>>
> > > > >>>>> files to their final location), but there can be other cases
> too.
> > > > >>>>>>
> > > > >>>>>> Here's an example GCP connector using the Sink API under the
> > hood:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > > >>>
> > > > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > > >>>>>>
> > > > >>>>> DatastoreIO,
> > > > >>>>>
> > > > >>>>>> BigtableIO etc.)
> > > > >>>>>>
> > > > >>>>>> I'm not familiar with the Flink API, however I'm a bit
> confused
> > by
> > > > >>>>>>
> > > > >>>>> your
> > > > >>>
> > > > >>>> last paragraph: the Beam programming model is intentionally
> > > > >>>>>> runner-agnostic, so that you can run exactly the same code on
> > > > >>>>>>
> > > > >>>>> different
> > > > >>>
> > > > >>>> runners.
> > > > >>>>>>
> > > > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > > > sumitkchawla@gmail.com
> > > > >>>>>>
> > > > >>>>>
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>> Hi
> > > > >>>>>>>
> > > > >>>>>>> Please suggest me on what is the best way to write a Sink in
> > > > >>>>>>>
> > > > >>>>>> Beam.  I
> > > > >>>
> > > > >>>> see
> > > > >>>>>
> > > > >>>>>> that there is a Sink<T> abstract class which is in
> experimental
> > > > >>>>>>>
> > > > >>>>>> state.
> > > > >>>>
> > > > >>>>> What is the expected outcome of this one? Do we have the api
> > > > >>>>>>>
> > > > >>>>>> frozen,
> > > > >>>
> > > > >>>> or
> > > > >>>>
> > > > >>>>> this could still change?  Most of the existing Sink
> > implementations
> > > > >>>>>>>
> > > > >>>>>> like
> > > > >>>>>
> > > > >>>>>> KafkaIO.Write are not using this interface, and instead
> extends
> > > > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
> > changed
> > > to
> > > > >>>>>>>
> > > > >>>>>> extend
> > > > >>>>>>
> > > > >>>>>>> Sink<>.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
> > > Which
> > > > >>>>>>>
> > > > >>>>>> mandates
> > > > >>>>>>
> > > > >>>>>>> that my implementation must also implement SinkFunction<>.
> In
> > > that
> > > > >>>>>>>
> > > > >>>>>> case,
> > > > >>>>>
> > > > >>>>>> none of the Sink<> methods get called anyway.
> > > > >>>>>>>
> > > > >>>>>>> Regards
> > > > >>>>>>> Sumit Chawla
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > > --
> > > > > Jean-Baptiste Onofré
> > > > > jbonofre@apache.org
> > > > > http://blog.nanthrax.net
> > > > > Talend - http://www.talend.com
> > > > >
> > > >
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
I prefer the "IO" approach as it provides the advanced feature leveraged 
by the Beam model.

My $0.01

Regards
JB

On 07/29/2016 07:45 PM, Raghu Angadi wrote:
> It is the preferred pattern I think. Is your source bounded or unbounded
> (i.e. streaming)? If it is latter, your sink could even be simpler than
> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
> processElement().
>
> The pros are pretty clear : runner independent, pure Beam, simpler code.
> cons : no checkpoint/rollback, I don't know if Flink specific sink provides
> this either.
>
> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
>> Any more comments on this pattern suggested by Jean?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <kl...@google.com.invalid>
>> wrote:
>>
>>> What I said earlier is not quite accurate, though my advice is the same.
>>> Here are the corrections:
>>>
>>>  - The Write transform actually has a too-general name, and
>> Write.of(Sink)
>>> only really works for finite data. It re-windows into the global window
>> and
>>> replaces any triggers.
>>>  - So the special case in the Flink runner actually just _enables_ a
>> (fake)
>>> Sink to work.
>>>
>>> We should probably rename Write to some more specific name that indicates
>>> the particular strategy, and make it easier for a user to decide whether
>>> that pattern is what they want. And the transform as-is should probably
>>> reject unbounded inputs.
>>>
>>> So you should still proceed with implementation via ParDo and your own
>>> logic. If you want some logic similar to Write (but with different
>>> windowing and triggering) then it is a pretty simple composite to derive
>>> something from.
>>>
>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <su...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Jean
>>>>
>>>> This is an interesting pattern here.  I see that its implemented as
>>>> PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
>>>> Sink<T> interface.  Would love to hear more pros/cons of this pattern
>> :)
>>> .
>>>> Definitely it gives more control over connection initialization and
>>>> cleanup.
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofr� <
>> jb@nanthrax.net>
>>>> wrote:
>>>>
>>>>> Hi Sumit,
>>>>>
>>>>> I created a PR containing Cassandra IO with a sink:
>>>>>
>>>>> https://github.com/apache/incubator-beam/pull/592
>>>>>
>>>>> Maybe it can help you.
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>>
>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>>>>>
>>>>>> Hi Kenneth
>>>>>>
>>>>>> Thanks for looking into it. I am currently trying to implement Sinks
>>> for
>>>>>> writing data into Cassandra/Titan DB.  My immediate goal is to run
>> it
>>> on
>>>>>> Flink Runner.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>> Sumit Chawla
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
>>>> <klk@google.com.invalid
>>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Sumit,
>>>>>>>
>>>>>>> I see what has happened here, from that snippet you pasted from the
>>>> Flink
>>>>>>> runner's code [1]. Thanks for looking into it!
>>>>>>>
>>>>>>> The Flink runner today appears to reject Write.Bounded transforms
>> in
>>>>>>> streaming mode if the sink is not an instance of
>> UnboundedFlinkSink.
>>>> The
>>>>>>> intent of that code, I believe, was to special case
>>> UnboundedFlinkSink
>>>> to
>>>>>>> make it easy to use an existing Flink sink, not to disable all
>> other
>>>>>>> Write
>>>>>>> transforms. What do you think, Max?
>>>>>>>
>>>>>>> Until we fix this issue, you should use ParDo transforms to do the
>>>>>>> writing.
>>>>>>> If you can share a little about your sink, we may be able to
>> suggest
>>>>>>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
>>>>>>> transform is just a specialized pattern of ParDo's, not a Beam
>>>> primitive.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>
>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific to the
>>>> Flink
>>>>>>>> runner, and I'll then defer to somebody familiar with it.
>>>>>>>>
>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
>>> sumitkchawla@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks a lot Eugene.
>>>>>>>>>
>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>> Which
>>>>>>>>>>>>
>>>>>>>>>>> mandates that my implementation must also implement
>>> SinkFunction<>.
>>>>>>>>> In
>>>>>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>>>>>
>>>>>>>>> I am using FlinkRunner. The Sink implementation that i was
>> writing
>>> by
>>>>>>>>> extending Sink<> class had to implement Flink Specific
>> SinkFunction
>>>> for
>>>>>>>>>
>>>>>>>> the
>>>>>>>>
>>>>>>>>> correct translation.
>>>>>>>>>
>>>>>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>>
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>>>>>
>>>>>>>> {
>>>>>>>>>
>>>>>>>>>    @Override
>>>>>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>>>>>> FlinkStreamingTranslationContext context) {
>>>>>>>>>      String name = transform.getName();
>>>>>>>>>      PValue input = context.getInput(transform);
>>>>>>>>>
>>>>>>>>>      Sink<T> sink = transform.getSink();
>>>>>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>>>>>        throw new UnsupportedOperationException("At the time, only
>>>>>>>>> unbounded Flink sinks are supported.");
>>>>>>>>>      }
>>>>>>>>>
>>>>>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>>>>>> context.getInputDataStream(input);
>>>>>>>>>
>>>>>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>>>>>
>>>>>>>> Object>()
>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>>>        @Override
>>>>>>>>>        public void flatMap(WindowedValue<T> value,
>>> Collector<Object>
>>>>>>>>> out) throws Exception {
>>>>>>>>>          out.collect(value.getValue());
>>>>>>>>>        }
>>>>>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>>>>>> sink).getFlinkSource()).name(name);
>>>>>>>>>    }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> Sumit Chawla
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>> Hi Sumit,
>>>>>>>>>>
>>>>>>>>>> All reusable parts of a pipeline, including connectors to
>> storage
>>>>>>>>>>
>>>>>>>>> systems,
>>>>>>>>>
>>>>>>>>>> should be packaged as PTransform's.
>>>>>>>>>>
>>>>>>>>>> Sink is an advanced API that you can use under the hood to
>>> implement
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>
>>>>>>>>> transform, if this particular connector benefits from this API -
>>> but
>>>>>>>>>>
>>>>>>>>> you
>>>>>>>>
>>>>>>>>> don't have to, and many connectors indeed don't need it, and are
>>>>>>>>>>
>>>>>>>>> simpler
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> implement just as wrappers around a couple of ParDo's writing
>> the
>>>>>>>>>>
>>>>>>>>> data.
>>>>>>>
>>>>>>>>
>>>>>>>>>> Even if the connector is implemented using a Sink, packaging the
>>>>>>>>>>
>>>>>>>>> connector
>>>>>>>>>
>>>>>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>>>>>
>>>>>>>>> pipeline
>>>>>>>
>>>>>>>> and
>>>>>>>>>
>>>>>>>>>> because it's more future-proof (the author of the connector may
>>>> later
>>>>>>>>>> change it to use something else rather than Sink under the hood
>>>>>>>>>>
>>>>>>>>> without
>>>>>>>
>>>>>>>> breaking existing users).
>>>>>>>>>>
>>>>>>>>>> Sink is, currently, useful in the following case:
>>>>>>>>>> - You're writing a bounded amount of data (we do not yet have an
>>>>>>>>>>
>>>>>>>>> unbounded
>>>>>>>>>
>>>>>>>>>> Sink analogue)
>>>>>>>>>> - The location you're writing to is known at pipeline
>> construction
>>>>>>>>>>
>>>>>>>>> time,
>>>>>>>>
>>>>>>>>> and does not depend on the data itself (support for
>>> "data-dependent"
>>>>>>>>>>
>>>>>>>>> sinks
>>>>>>>>>
>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>>>>>>>>>> - The storage system you're writing to has a distinct
>>>>>>>>>>
>>>>>>>>> "initialization"
>>>>>>>
>>>>>>>> and
>>>>>>>>>
>>>>>>>>>> "finalization" step, allowing the write operation to appear
>> atomic
>>>>>>>>>>
>>>>>>>>> (either
>>>>>>>>>
>>>>>>>>>> all data is written or none). This mostly applies to files
>> (where
>>>>>>>>>>
>>>>>>>>> writing
>>>>>>>>
>>>>>>>>> is done by first writing to a temporary directory, and then
>>> renaming
>>>>>>>>>>
>>>>>>>>> all
>>>>>>>>
>>>>>>>>> files to their final location), but there can be other cases too.
>>>>>>>>>>
>>>>>>>>>> Here's an example GCP connector using the Sink API under the
>> hood:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>>>>>
>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>>>>>
>>>>>>>>> DatastoreIO,
>>>>>>>>>
>>>>>>>>>> BigtableIO etc.)
>>>>>>>>>>
>>>>>>>>>> I'm not familiar with the Flink API, however I'm a bit confused
>> by
>>>>>>>>>>
>>>>>>>>> your
>>>>>>>
>>>>>>>> last paragraph: the Beam programming model is intentionally
>>>>>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>>>>>
>>>>>>>>> different
>>>>>>>
>>>>>>>> runners.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
>>>> sumitkchawla@gmail.com
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>>>>>
>>>>>>>>>> Beam.  I
>>>>>>>
>>>>>>>> see
>>>>>>>>>
>>>>>>>>>> that there is a Sink<T> abstract class which is in experimental
>>>>>>>>>>>
>>>>>>>>>> state.
>>>>>>>>
>>>>>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>>>>>
>>>>>>>>>> frozen,
>>>>>>>
>>>>>>>> or
>>>>>>>>
>>>>>>>>> this could still change?  Most of the existing Sink
>> implementations
>>>>>>>>>>>
>>>>>>>>>> like
>>>>>>>>>
>>>>>>>>>> KafkaIO.Write are not using this interface, and instead extends
>>>>>>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
>> changed
>>> to
>>>>>>>>>>>
>>>>>>>>>> extend
>>>>>>>>>>
>>>>>>>>>>> Sink<>.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
>>> Which
>>>>>>>>>>>
>>>>>>>>>> mandates
>>>>>>>>>>
>>>>>>>>>>> that my implementation must also implement SinkFunction<>.  In
>>> that
>>>>>>>>>>>
>>>>>>>>>> case,
>>>>>>>>>
>>>>>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> Sumit Chawla
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofr�
>>>>> jbonofre@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Suggestion for Writing Sink Implementation

Posted by Raghu Angadi <ra...@google.com.INVALID>.
It is the preferred pattern I think. Is your source bounded or unbounded
(i.e. streaming)? If it is latter, your sink could even be simpler than
JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
processElement().

The pros are pretty clear : runner independent, pure Beam, simpler code.
cons : no checkpoint/rollback, I don't know if Flink specific sink provides
this either.

On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <su...@gmail.com>
wrote:

> Any more comments on this pattern suggested by Jean?
>
> Regards
> Sumit Chawla
>
>
> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > What I said earlier is not quite accurate, though my advice is the same.
> > Here are the corrections:
> >
> >  - The Write transform actually has a too-general name, and
> Write.of(Sink)
> > only really works for finite data. It re-windows into the global window
> and
> > replaces any triggers.
> >  - So the special case in the Flink runner actually just _enables_ a
> (fake)
> > Sink to work.
> >
> > We should probably rename Write to some more specific name that indicates
> > the particular strategy, and make it easier for a user to decide whether
> > that pattern is what they want. And the transform as-is should probably
> > reject unbounded inputs.
> >
> > So you should still proceed with implementation via ParDo and your own
> > logic. If you want some logic similar to Write (but with different
> > windowing and triggering) then it is a pretty simple composite to derive
> > something from.
> >
> > On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Thanks Jean
> > >
> > > This is an interesting pattern here.  I see that its implemented as
> > > PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> > > Sink<T> interface.  Would love to hear more pros/cons of this pattern
> :)
> > .
> > > Definitely it gives more control over connection initialization and
> > > cleanup.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
> jb@nanthrax.net>
> > > wrote:
> > >
> > > > Hi Sumit,
> > > >
> > > > I created a PR containing Cassandra IO with a sink:
> > > >
> > > > https://github.com/apache/incubator-beam/pull/592
> > > >
> > > > Maybe it can help you.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > > >
> > > >> Hi Kenneth
> > > >>
> > > >> Thanks for looking into it. I am currently trying to implement Sinks
> > for
> > > >> writing data into Cassandra/Titan DB.  My immediate goal is to run
> it
> > on
> > > >> Flink Runner.
> > > >>
> > > >>
> > > >>
> > > >> Regards
> > > >> Sumit Chawla
> > > >>
> > > >>
> > > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > > <klk@google.com.invalid
> > > >> >
> > > >> wrote:
> > > >>
> > > >> Hi Sumit,
> > > >>>
> > > >>> I see what has happened here, from that snippet you pasted from the
> > > Flink
> > > >>> runner's code [1]. Thanks for looking into it!
> > > >>>
> > > >>> The Flink runner today appears to reject Write.Bounded transforms
> in
> > > >>> streaming mode if the sink is not an instance of
> UnboundedFlinkSink.
> > > The
> > > >>> intent of that code, I believe, was to special case
> > UnboundedFlinkSink
> > > to
> > > >>> make it easy to use an existing Flink sink, not to disable all
> other
> > > >>> Write
> > > >>> transforms. What do you think, Max?
> > > >>>
> > > >>> Until we fix this issue, you should use ParDo transforms to do the
> > > >>> writing.
> > > >>> If you can share a little about your sink, we may be able to
> suggest
> > > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> > > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > > primitive.
> > > >>>
> > > >>> Kenn
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > > >>>
> > > >>>
> > > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > > >>> kirpichov@google.com.invalid> wrote:
> > > >>>
> > > >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> > > Flink
> > > >>>> runner, and I'll then defer to somebody familiar with it.
> > > >>>>
> > > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Thanks a lot Eugene.
> > > >>>>>
> > > >>>>> My immediate requirement is to run this Sink on FlinkRunner.
> Which
> > > >>>>>>>>
> > > >>>>>>> mandates that my implementation must also implement
> > SinkFunction<>.
> > > >>>>> In
> > > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > > >>>>>
> > > >>>>> I am using FlinkRunner. The Sink implementation that i was
> writing
> > by
> > > >>>>> extending Sink<> class had to implement Flink Specific
> SinkFunction
> > > for
> > > >>>>>
> > > >>>> the
> > > >>>>
> > > >>>>> correct translation.
> > > >>>>>
> > > >>>>> private static class WriteSinkStreamingTranslator<T> implements
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > >>>
> > > >>>> {
> > > >>>>>
> > > >>>>>    @Override
> > > >>>>>    public void translateNode(Write.Bound<T> transform,
> > > >>>>> FlinkStreamingTranslationContext context) {
> > > >>>>>      String name = transform.getName();
> > > >>>>>      PValue input = context.getInput(transform);
> > > >>>>>
> > > >>>>>      Sink<T> sink = transform.getSink();
> > > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > > >>>>>        throw new UnsupportedOperationException("At the time, only
> > > >>>>> unbounded Flink sinks are supported.");
> > > >>>>>      }
> > > >>>>>
> > > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > > >>>>> context.getInputDataStream(input);
> > > >>>>>
> > > >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> > > >>>>>
> > > >>>> Object>()
> > > >>>
> > > >>>> {
> > > >>>>
> > > >>>>>        @Override
> > > >>>>>        public void flatMap(WindowedValue<T> value,
> > Collector<Object>
> > > >>>>> out) throws Exception {
> > > >>>>>          out.collect(value.getValue());
> > > >>>>>        }
> > > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > > >>>>> sink).getFlinkSource()).name(name);
> > > >>>>>    }
> > > >>>>> }
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Regards
> > > >>>>> Sumit Chawla
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > >>>>> kirpichov@google.com.invalid> wrote:
> > > >>>>>
> > > >>>>> Hi Sumit,
> > > >>>>>>
> > > >>>>>> All reusable parts of a pipeline, including connectors to
> storage
> > > >>>>>>
> > > >>>>> systems,
> > > >>>>>
> > > >>>>>> should be packaged as PTransform's.
> > > >>>>>>
> > > >>>>>> Sink is an advanced API that you can use under the hood to
> > implement
> > > >>>>>>
> > > >>>>> the
> > > >>>>
> > > >>>>> transform, if this particular connector benefits from this API -
> > but
> > > >>>>>>
> > > >>>>> you
> > > >>>>
> > > >>>>> don't have to, and many connectors indeed don't need it, and are
> > > >>>>>>
> > > >>>>> simpler
> > > >>>>
> > > >>>>> to
> > > >>>>>
> > > >>>>>> implement just as wrappers around a couple of ParDo's writing
> the
> > > >>>>>>
> > > >>>>> data.
> > > >>>
> > > >>>>
> > > >>>>>> Even if the connector is implemented using a Sink, packaging the
> > > >>>>>>
> > > >>>>> connector
> > > >>>>>
> > > >>>>>> as a PTransform is important because it's easier to apply in a
> > > >>>>>>
> > > >>>>> pipeline
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> because it's more future-proof (the author of the connector may
> > > later
> > > >>>>>> change it to use something else rather than Sink under the hood
> > > >>>>>>
> > > >>>>> without
> > > >>>
> > > >>>> breaking existing users).
> > > >>>>>>
> > > >>>>>> Sink is, currently, useful in the following case:
> > > >>>>>> - You're writing a bounded amount of data (we do not yet have an
> > > >>>>>>
> > > >>>>> unbounded
> > > >>>>>
> > > >>>>>> Sink analogue)
> > > >>>>>> - The location you're writing to is known at pipeline
> construction
> > > >>>>>>
> > > >>>>> time,
> > > >>>>
> > > >>>>> and does not depend on the data itself (support for
> > "data-dependent"
> > > >>>>>>
> > > >>>>> sinks
> > > >>>>>
> > > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > > >>>>>> - The storage system you're writing to has a distinct
> > > >>>>>>
> > > >>>>> "initialization"
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> "finalization" step, allowing the write operation to appear
> atomic
> > > >>>>>>
> > > >>>>> (either
> > > >>>>>
> > > >>>>>> all data is written or none). This mostly applies to files
> (where
> > > >>>>>>
> > > >>>>> writing
> > > >>>>
> > > >>>>> is done by first writing to a temporary directory, and then
> > renaming
> > > >>>>>>
> > > >>>>> all
> > > >>>>
> > > >>>>> files to their final location), but there can be other cases too.
> > > >>>>>>
> > > >>>>>> Here's an example GCP connector using the Sink API under the
> hood:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > >>>
> > > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > >>>>>>
> > > >>>>> DatastoreIO,
> > > >>>>>
> > > >>>>>> BigtableIO etc.)
> > > >>>>>>
> > > >>>>>> I'm not familiar with the Flink API, however I'm a bit confused
> by
> > > >>>>>>
> > > >>>>> your
> > > >>>
> > > >>>> last paragraph: the Beam programming model is intentionally
> > > >>>>>> runner-agnostic, so that you can run exactly the same code on
> > > >>>>>>
> > > >>>>> different
> > > >>>
> > > >>>> runners.
> > > >>>>>>
> > > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > > sumitkchawla@gmail.com
> > > >>>>>>
> > > >>>>>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>> Hi
> > > >>>>>>>
> > > >>>>>>> Please suggest me on what is the best way to write a Sink in
> > > >>>>>>>
> > > >>>>>> Beam.  I
> > > >>>
> > > >>>> see
> > > >>>>>
> > > >>>>>> that there is a Sink<T> abstract class which is in experimental
> > > >>>>>>>
> > > >>>>>> state.
> > > >>>>
> > > >>>>> What is the expected outcome of this one? Do we have the api
> > > >>>>>>>
> > > >>>>>> frozen,
> > > >>>
> > > >>>> or
> > > >>>>
> > > >>>>> this could still change?  Most of the existing Sink
> implementations
> > > >>>>>>>
> > > >>>>>> like
> > > >>>>>
> > > >>>>>> KafkaIO.Write are not using this interface, and instead extends
> > > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
> changed
> > to
> > > >>>>>>>
> > > >>>>>> extend
> > > >>>>>>
> > > >>>>>>> Sink<>.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
> > Which
> > > >>>>>>>
> > > >>>>>> mandates
> > > >>>>>>
> > > >>>>>>> that my implementation must also implement SinkFunction<>.  In
> > that
> > > >>>>>>>
> > > >>>>>> case,
> > > >>>>>
> > > >>>>>> none of the Sink<> methods get called anyway.
> > > >>>>>>>
> > > >>>>>>> Regards
> > > >>>>>>> Sumit Chawla
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > > --
> > > > Jean-Baptiste Onofré
> > > > jbonofre@apache.org
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Any more comments on this pattern suggested by Jean?

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <kl...@google.com.invalid>
wrote:

> What I said earlier is not quite accurate, though my advice is the same.
> Here are the corrections:
>
>  - The Write transform actually has a too-general name, and Write.of(Sink)
> only really works for finite data. It re-windows into the global window and
> replaces any triggers.
>  - So the special case in the Flink runner actually just _enables_ a (fake)
> Sink to work.
>
> We should probably rename Write to some more specific name that indicates
> the particular strategy, and make it easier for a user to decide whether
> that pattern is what they want. And the transform as-is should probably
> reject unbounded inputs.
>
> So you should still proceed with implementation via ParDo and your own
> logic. If you want some logic similar to Write (but with different
> windowing and triggering) then it is a pretty simple composite to derive
> something from.
>
> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Thanks Jean
> >
> > This is an interesting pattern here.  I see that its implemented as
> > PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> > Sink<T> interface.  Would love to hear more pros/cons of this pattern :)
> .
> > Definitely it gives more control over connection initialization and
> > cleanup.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> > wrote:
> >
> > > Hi Sumit,
> > >
> > > I created a PR containing Cassandra IO with a sink:
> > >
> > > https://github.com/apache/incubator-beam/pull/592
> > >
> > > Maybe it can help you.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > >
> > >> Hi Kenneth
> > >>
> > >> Thanks for looking into it. I am currently trying to implement Sinks
> for
> > >> writing data into Cassandra/Titan DB.  My immediate goal is to run it
> on
> > >> Flink Runner.
> > >>
> > >>
> > >>
> > >> Regards
> > >> Sumit Chawla
> > >>
> > >>
> > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > <klk@google.com.invalid
> > >> >
> > >> wrote:
> > >>
> > >> Hi Sumit,
> > >>>
> > >>> I see what has happened here, from that snippet you pasted from the
> > Flink
> > >>> runner's code [1]. Thanks for looking into it!
> > >>>
> > >>> The Flink runner today appears to reject Write.Bounded transforms in
> > >>> streaming mode if the sink is not an instance of UnboundedFlinkSink.
> > The
> > >>> intent of that code, I believe, was to special case
> UnboundedFlinkSink
> > to
> > >>> make it easy to use an existing Flink sink, not to disable all other
> > >>> Write
> > >>> transforms. What do you think, Max?
> > >>>
> > >>> Until we fix this issue, you should use ParDo transforms to do the
> > >>> writing.
> > >>> If you can share a little about your sink, we may be able to suggest
> > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > primitive.
> > >>>
> > >>> Kenn
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>>
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > >>>
> > >>>
> > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > >>> kirpichov@google.com.invalid> wrote:
> > >>>
> > >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> > Flink
> > >>>> runner, and I'll then defer to somebody familiar with it.
> > >>>>
> > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> sumitkchawla@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> Thanks a lot Eugene.
> > >>>>>
> > >>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
> > >>>>>>>>
> > >>>>>>> mandates that my implementation must also implement
> SinkFunction<>.
> > >>>>> In
> > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > >>>>>
> > >>>>> I am using FlinkRunner. The Sink implementation that i was writing
> by
> > >>>>> extending Sink<> class had to implement Flink Specific SinkFunction
> > for
> > >>>>>
> > >>>> the
> > >>>>
> > >>>>> correct translation.
> > >>>>>
> > >>>>> private static class WriteSinkStreamingTranslator<T> implements
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > >>>
> > >>>> {
> > >>>>>
> > >>>>>    @Override
> > >>>>>    public void translateNode(Write.Bound<T> transform,
> > >>>>> FlinkStreamingTranslationContext context) {
> > >>>>>      String name = transform.getName();
> > >>>>>      PValue input = context.getInput(transform);
> > >>>>>
> > >>>>>      Sink<T> sink = transform.getSink();
> > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > >>>>>        throw new UnsupportedOperationException("At the time, only
> > >>>>> unbounded Flink sinks are supported.");
> > >>>>>      }
> > >>>>>
> > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > >>>>> context.getInputDataStream(input);
> > >>>>>
> > >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> > >>>>>
> > >>>> Object>()
> > >>>
> > >>>> {
> > >>>>
> > >>>>>        @Override
> > >>>>>        public void flatMap(WindowedValue<T> value,
> Collector<Object>
> > >>>>> out) throws Exception {
> > >>>>>          out.collect(value.getValue());
> > >>>>>        }
> > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > >>>>> sink).getFlinkSource()).name(name);
> > >>>>>    }
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Regards
> > >>>>> Sumit Chawla
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > >>>>> kirpichov@google.com.invalid> wrote:
> > >>>>>
> > >>>>> Hi Sumit,
> > >>>>>>
> > >>>>>> All reusable parts of a pipeline, including connectors to storage
> > >>>>>>
> > >>>>> systems,
> > >>>>>
> > >>>>>> should be packaged as PTransform's.
> > >>>>>>
> > >>>>>> Sink is an advanced API that you can use under the hood to
> implement
> > >>>>>>
> > >>>>> the
> > >>>>
> > >>>>> transform, if this particular connector benefits from this API -
> but
> > >>>>>>
> > >>>>> you
> > >>>>
> > >>>>> don't have to, and many connectors indeed don't need it, and are
> > >>>>>>
> > >>>>> simpler
> > >>>>
> > >>>>> to
> > >>>>>
> > >>>>>> implement just as wrappers around a couple of ParDo's writing the
> > >>>>>>
> > >>>>> data.
> > >>>
> > >>>>
> > >>>>>> Even if the connector is implemented using a Sink, packaging the
> > >>>>>>
> > >>>>> connector
> > >>>>>
> > >>>>>> as a PTransform is important because it's easier to apply in a
> > >>>>>>
> > >>>>> pipeline
> > >>>
> > >>>> and
> > >>>>>
> > >>>>>> because it's more future-proof (the author of the connector may
> > later
> > >>>>>> change it to use something else rather than Sink under the hood
> > >>>>>>
> > >>>>> without
> > >>>
> > >>>> breaking existing users).
> > >>>>>>
> > >>>>>> Sink is, currently, useful in the following case:
> > >>>>>> - You're writing a bounded amount of data (we do not yet have an
> > >>>>>>
> > >>>>> unbounded
> > >>>>>
> > >>>>>> Sink analogue)
> > >>>>>> - The location you're writing to is known at pipeline construction
> > >>>>>>
> > >>>>> time,
> > >>>>
> > >>>>> and does not depend on the data itself (support for
> "data-dependent"
> > >>>>>>
> > >>>>> sinks
> > >>>>>
> > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > >>>>>> - The storage system you're writing to has a distinct
> > >>>>>>
> > >>>>> "initialization"
> > >>>
> > >>>> and
> > >>>>>
> > >>>>>> "finalization" step, allowing the write operation to appear atomic
> > >>>>>>
> > >>>>> (either
> > >>>>>
> > >>>>>> all data is written or none). This mostly applies to files (where
> > >>>>>>
> > >>>>> writing
> > >>>>
> > >>>>> is done by first writing to a temporary directory, and then
> renaming
> > >>>>>>
> > >>>>> all
> > >>>>
> > >>>>> files to their final location), but there can be other cases too.
> > >>>>>>
> > >>>>>> Here's an example GCP connector using the Sink API under the hood:
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > >>>
> > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > >>>>>>
> > >>>>> DatastoreIO,
> > >>>>>
> > >>>>>> BigtableIO etc.)
> > >>>>>>
> > >>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
> > >>>>>>
> > >>>>> your
> > >>>
> > >>>> last paragraph: the Beam programming model is intentionally
> > >>>>>> runner-agnostic, so that you can run exactly the same code on
> > >>>>>>
> > >>>>> different
> > >>>
> > >>>> runners.
> > >>>>>>
> > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > sumitkchawla@gmail.com
> > >>>>>>
> > >>>>>
> > >>>> wrote:
> > >>>>>>
> > >>>>>> Hi
> > >>>>>>>
> > >>>>>>> Please suggest me on what is the best way to write a Sink in
> > >>>>>>>
> > >>>>>> Beam.  I
> > >>>
> > >>>> see
> > >>>>>
> > >>>>>> that there is a Sink<T> abstract class which is in experimental
> > >>>>>>>
> > >>>>>> state.
> > >>>>
> > >>>>> What is the expected outcome of this one? Do we have the api
> > >>>>>>>
> > >>>>>> frozen,
> > >>>
> > >>>> or
> > >>>>
> > >>>>> this could still change?  Most of the existing Sink implementations
> > >>>>>>>
> > >>>>>> like
> > >>>>>
> > >>>>>> KafkaIO.Write are not using this interface, and instead extends
> > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed
> to
> > >>>>>>>
> > >>>>>> extend
> > >>>>>>
> > >>>>>>> Sink<>.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
> Which
> > >>>>>>>
> > >>>>>> mandates
> > >>>>>>
> > >>>>>>> that my implementation must also implement SinkFunction<>.  In
> that
> > >>>>>>>
> > >>>>>> case,
> > >>>>>
> > >>>>>> none of the Sink<> methods get called anyway.
> > >>>>>>>
> > >>>>>>> Regards
> > >>>>>>> Sumit Chawla
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbonofre@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
What I said earlier is not quite accurate, though my advice is the same.
Here are the corrections:

 - The Write transform actually has a too-general name, and Write.of(Sink)
only really works for finite data. It re-windows into the global window and
replaces any triggers.
 - So the special case in the Flink runner actually just _enables_ a (fake)
Sink to work.

We should probably rename Write to some more specific name that indicates
the particular strategy, and make it easier for a user to decide whether
that pattern is what they want. And the transform as-is should probably
reject unbounded inputs.

So you should still proceed with implementation via ParDo and your own
logic. If you want some logic similar to Write (but with different
windowing and triggering) then it is a pretty simple composite to derive
something from.

On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <su...@gmail.com>
wrote:

> Thanks Jean
>
> This is an interesting pattern here.  I see that its implemented as
> PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> Sink<T> interface.  Would love to hear more pros/cons of this pattern :) .
> Definitely it gives more control over connection initialization and
> cleanup.
>
> Regards
> Sumit Chawla
>
>
> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> > Hi Sumit,
> >
> > I created a PR containing Cassandra IO with a sink:
> >
> > https://github.com/apache/incubator-beam/pull/592
> >
> > Maybe it can help you.
> >
> > Regards
> > JB
> >
> >
> > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> >
> >> Hi Kenneth
> >>
> >> Thanks for looking into it. I am currently trying to implement Sinks for
> >> writing data into Cassandra/Titan DB.  My immediate goal is to run it on
> >> Flink Runner.
> >>
> >>
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
> >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> <klk@google.com.invalid
> >> >
> >> wrote:
> >>
> >> Hi Sumit,
> >>>
> >>> I see what has happened here, from that snippet you pasted from the
> Flink
> >>> runner's code [1]. Thanks for looking into it!
> >>>
> >>> The Flink runner today appears to reject Write.Bounded transforms in
> >>> streaming mode if the sink is not an instance of UnboundedFlinkSink.
> The
> >>> intent of that code, I believe, was to special case UnboundedFlinkSink
> to
> >>> make it easy to use an existing Flink sink, not to disable all other
> >>> Write
> >>> transforms. What do you think, Max?
> >>>
> >>> Until we fix this issue, you should use ParDo transforms to do the
> >>> writing.
> >>> If you can share a little about your sink, we may be able to suggest
> >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> >>> transform is just a specialized pattern of ParDo's, not a Beam
> primitive.
> >>>
> >>> Kenn
> >>>
> >>> [1]
> >>>
> >>>
> >>>
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> >>>
> >>>
> >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> >>> kirpichov@google.com.invalid> wrote:
> >>>
> >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> Flink
> >>>> runner, and I'll then defer to somebody familiar with it.
> >>>>
> >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
> >>>> wrote:
> >>>>
> >>>> Thanks a lot Eugene.
> >>>>>
> >>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
> >>>>>>>>
> >>>>>>> mandates that my implementation must also implement SinkFunction<>.
> >>>>> In
> >>>>> that >>>case, none of the Sink<> methods get called anyway.
> >>>>>
> >>>>> I am using FlinkRunner. The Sink implementation that i was writing by
> >>>>> extending Sink<> class had to implement Flink Specific SinkFunction
> for
> >>>>>
> >>>> the
> >>>>
> >>>>> correct translation.
> >>>>>
> >>>>> private static class WriteSinkStreamingTranslator<T> implements
> >>>>>
> >>>>>
> >>>>
> >>>
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> >>>
> >>>> {
> >>>>>
> >>>>>    @Override
> >>>>>    public void translateNode(Write.Bound<T> transform,
> >>>>> FlinkStreamingTranslationContext context) {
> >>>>>      String name = transform.getName();
> >>>>>      PValue input = context.getInput(transform);
> >>>>>
> >>>>>      Sink<T> sink = transform.getSink();
> >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> >>>>>        throw new UnsupportedOperationException("At the time, only
> >>>>> unbounded Flink sinks are supported.");
> >>>>>      }
> >>>>>
> >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> >>>>> context.getInputDataStream(input);
> >>>>>
> >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> >>>>>
> >>>> Object>()
> >>>
> >>>> {
> >>>>
> >>>>>        @Override
> >>>>>        public void flatMap(WindowedValue<T> value, Collector<Object>
> >>>>> out) throws Exception {
> >>>>>          out.collect(value.getValue());
> >>>>>        }
> >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> >>>>> sink).getFlinkSource()).name(name);
> >>>>>    }
> >>>>> }
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> Regards
> >>>>> Sumit Chawla
> >>>>>
> >>>>>
> >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> >>>>> kirpichov@google.com.invalid> wrote:
> >>>>>
> >>>>> Hi Sumit,
> >>>>>>
> >>>>>> All reusable parts of a pipeline, including connectors to storage
> >>>>>>
> >>>>> systems,
> >>>>>
> >>>>>> should be packaged as PTransform's.
> >>>>>>
> >>>>>> Sink is an advanced API that you can use under the hood to implement
> >>>>>>
> >>>>> the
> >>>>
> >>>>> transform, if this particular connector benefits from this API - but
> >>>>>>
> >>>>> you
> >>>>
> >>>>> don't have to, and many connectors indeed don't need it, and are
> >>>>>>
> >>>>> simpler
> >>>>
> >>>>> to
> >>>>>
> >>>>>> implement just as wrappers around a couple of ParDo's writing the
> >>>>>>
> >>>>> data.
> >>>
> >>>>
> >>>>>> Even if the connector is implemented using a Sink, packaging the
> >>>>>>
> >>>>> connector
> >>>>>
> >>>>>> as a PTransform is important because it's easier to apply in a
> >>>>>>
> >>>>> pipeline
> >>>
> >>>> and
> >>>>>
> >>>>>> because it's more future-proof (the author of the connector may
> later
> >>>>>> change it to use something else rather than Sink under the hood
> >>>>>>
> >>>>> without
> >>>
> >>>> breaking existing users).
> >>>>>>
> >>>>>> Sink is, currently, useful in the following case:
> >>>>>> - You're writing a bounded amount of data (we do not yet have an
> >>>>>>
> >>>>> unbounded
> >>>>>
> >>>>>> Sink analogue)
> >>>>>> - The location you're writing to is known at pipeline construction
> >>>>>>
> >>>>> time,
> >>>>
> >>>>> and does not depend on the data itself (support for "data-dependent"
> >>>>>>
> >>>>> sinks
> >>>>>
> >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> >>>>>> - The storage system you're writing to has a distinct
> >>>>>>
> >>>>> "initialization"
> >>>
> >>>> and
> >>>>>
> >>>>>> "finalization" step, allowing the write operation to appear atomic
> >>>>>>
> >>>>> (either
> >>>>>
> >>>>>> all data is written or none). This mostly applies to files (where
> >>>>>>
> >>>>> writing
> >>>>
> >>>>> is done by first writing to a temporary directory, and then renaming
> >>>>>>
> >>>>> all
> >>>>
> >>>>> files to their final location), but there can be other cases too.
> >>>>>>
> >>>>>> Here's an example GCP connector using the Sink API under the hood:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> >>>
> >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> >>>>>>
> >>>>> DatastoreIO,
> >>>>>
> >>>>>> BigtableIO etc.)
> >>>>>>
> >>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
> >>>>>>
> >>>>> your
> >>>
> >>>> last paragraph: the Beam programming model is intentionally
> >>>>>> runner-agnostic, so that you can run exactly the same code on
> >>>>>>
> >>>>> different
> >>>
> >>>> runners.
> >>>>>>
> >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> sumitkchawla@gmail.com
> >>>>>>
> >>>>>
> >>>> wrote:
> >>>>>>
> >>>>>> Hi
> >>>>>>>
> >>>>>>> Please suggest me on what is the best way to write a Sink in
> >>>>>>>
> >>>>>> Beam.  I
> >>>
> >>>> see
> >>>>>
> >>>>>> that there is a Sink<T> abstract class which is in experimental
> >>>>>>>
> >>>>>> state.
> >>>>
> >>>>> What is the expected outcome of this one? Do we have the api
> >>>>>>>
> >>>>>> frozen,
> >>>
> >>>> or
> >>>>
> >>>>> this could still change?  Most of the existing Sink implementations
> >>>>>>>
> >>>>>> like
> >>>>>
> >>>>>> KafkaIO.Write are not using this interface, and instead extends
> >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> >>>>>>>
> >>>>>> extend
> >>>>>>
> >>>>>>> Sink<>.
> >>>>>>>
> >>>>>>>
> >>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
> >>>>>>>
> >>>>>> mandates
> >>>>>>
> >>>>>>> that my implementation must also implement SinkFunction<>.  In that
> >>>>>>>
> >>>>>> case,
> >>>>>
> >>>>>> none of the Sink<> methods get called anyway.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>> Sumit Chawla
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Thanks Jean

This is an interesting pattern here.  I see that its implemented as
PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
Sink<T> interface.  Would love to hear more pros/cons of this pattern :) .
Definitely it gives more control over connection initialization and cleanup.

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Sumit,
>
> I created a PR containing Cassandra IO with a sink:
>
> https://github.com/apache/incubator-beam/pull/592
>
> Maybe it can help you.
>
> Regards
> JB
>
>
> On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
>
>> Hi Kenneth
>>
>> Thanks for looking into it. I am currently trying to implement Sinks for
>> writing data into Cassandra/Titan DB.  My immediate goal is to run it on
>> Flink Runner.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <klk@google.com.invalid
>> >
>> wrote:
>>
>> Hi Sumit,
>>>
>>> I see what has happened here, from that snippet you pasted from the Flink
>>> runner's code [1]. Thanks for looking into it!
>>>
>>> The Flink runner today appears to reject Write.Bounded transforms in
>>> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
>>> intent of that code, I believe, was to special case UnboundedFlinkSink to
>>> make it easy to use an existing Flink sink, not to disable all other
>>> Write
>>> transforms. What do you think, Max?
>>>
>>> Until we fix this issue, you should use ParDo transforms to do the
>>> writing.
>>> If you can share a little about your sink, we may be able to suggest
>>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
>>> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>>>
>>> Kenn
>>>
>>> [1]
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>>
>>>
>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>>> kirpichov@google.com.invalid> wrote:
>>>
>>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>>>> runner, and I'll then defer to somebody familiar with it.
>>>>
>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks a lot Eugene.
>>>>>
>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>>>>>
>>>>>>> mandates that my implementation must also implement SinkFunction<>.
>>>>> In
>>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>>
>>>>> I am using FlinkRunner. The Sink implementation that i was writing by
>>>>> extending Sink<> class had to implement Flink Specific SinkFunction for
>>>>>
>>>> the
>>>>
>>>>> correct translation.
>>>>>
>>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>>
>>>>>
>>>>
>>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>
>>>> {
>>>>>
>>>>>    @Override
>>>>>    public void translateNode(Write.Bound<T> transform,
>>>>> FlinkStreamingTranslationContext context) {
>>>>>      String name = transform.getName();
>>>>>      PValue input = context.getInput(transform);
>>>>>
>>>>>      Sink<T> sink = transform.getSink();
>>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>>        throw new UnsupportedOperationException("At the time, only
>>>>> unbounded Flink sinks are supported.");
>>>>>      }
>>>>>
>>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>>> context.getInputDataStream(input);
>>>>>
>>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>>>>>
>>>> Object>()
>>>
>>>> {
>>>>
>>>>>        @Override
>>>>>        public void flatMap(WindowedValue<T> value, Collector<Object>
>>>>> out) throws Exception {
>>>>>          out.collect(value.getValue());
>>>>>        }
>>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>>> sink).getFlinkSource()).name(name);
>>>>>    }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>> Sumit Chawla
>>>>>
>>>>>
>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>>> kirpichov@google.com.invalid> wrote:
>>>>>
>>>>> Hi Sumit,
>>>>>>
>>>>>> All reusable parts of a pipeline, including connectors to storage
>>>>>>
>>>>> systems,
>>>>>
>>>>>> should be packaged as PTransform's.
>>>>>>
>>>>>> Sink is an advanced API that you can use under the hood to implement
>>>>>>
>>>>> the
>>>>
>>>>> transform, if this particular connector benefits from this API - but
>>>>>>
>>>>> you
>>>>
>>>>> don't have to, and many connectors indeed don't need it, and are
>>>>>>
>>>>> simpler
>>>>
>>>>> to
>>>>>
>>>>>> implement just as wrappers around a couple of ParDo's writing the
>>>>>>
>>>>> data.
>>>
>>>>
>>>>>> Even if the connector is implemented using a Sink, packaging the
>>>>>>
>>>>> connector
>>>>>
>>>>>> as a PTransform is important because it's easier to apply in a
>>>>>>
>>>>> pipeline
>>>
>>>> and
>>>>>
>>>>>> because it's more future-proof (the author of the connector may later
>>>>>> change it to use something else rather than Sink under the hood
>>>>>>
>>>>> without
>>>
>>>> breaking existing users).
>>>>>>
>>>>>> Sink is, currently, useful in the following case:
>>>>>> - You're writing a bounded amount of data (we do not yet have an
>>>>>>
>>>>> unbounded
>>>>>
>>>>>> Sink analogue)
>>>>>> - The location you're writing to is known at pipeline construction
>>>>>>
>>>>> time,
>>>>
>>>>> and does not depend on the data itself (support for "data-dependent"
>>>>>>
>>>>> sinks
>>>>>
>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>>>>>> - The storage system you're writing to has a distinct
>>>>>>
>>>>> "initialization"
>>>
>>>> and
>>>>>
>>>>>> "finalization" step, allowing the write operation to appear atomic
>>>>>>
>>>>> (either
>>>>>
>>>>>> all data is written or none). This mostly applies to files (where
>>>>>>
>>>>> writing
>>>>
>>>>> is done by first writing to a temporary directory, and then renaming
>>>>>>
>>>>> all
>>>>
>>>>> files to their final location), but there can be other cases too.
>>>>>>
>>>>>> Here's an example GCP connector using the Sink API under the hood:
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>
>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>>>>
>>>>> DatastoreIO,
>>>>>
>>>>>> BigtableIO etc.)
>>>>>>
>>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
>>>>>>
>>>>> your
>>>
>>>> last paragraph: the Beam programming model is intentionally
>>>>>> runner-agnostic, so that you can run exactly the same code on
>>>>>>
>>>>> different
>>>
>>>> runners.
>>>>>>
>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkchawla@gmail.com
>>>>>>
>>>>>
>>>> wrote:
>>>>>>
>>>>>> Hi
>>>>>>>
>>>>>>> Please suggest me on what is the best way to write a Sink in
>>>>>>>
>>>>>> Beam.  I
>>>
>>>> see
>>>>>
>>>>>> that there is a Sink<T> abstract class which is in experimental
>>>>>>>
>>>>>> state.
>>>>
>>>>> What is the expected outcome of this one? Do we have the api
>>>>>>>
>>>>>> frozen,
>>>
>>>> or
>>>>
>>>>> this could still change?  Most of the existing Sink implementations
>>>>>>>
>>>>>> like
>>>>>
>>>>>> KafkaIO.Write are not using this interface, and instead extends
>>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
>>>>>>>
>>>>>> extend
>>>>>>
>>>>>>> Sink<>.
>>>>>>>
>>>>>>>
>>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>>>>
>>>>>> mandates
>>>>>>
>>>>>>> that my implementation must also implement SinkFunction<>.  In that
>>>>>>>
>>>>>> case,
>>>>>
>>>>>> none of the Sink<> methods get called anyway.
>>>>>>>
>>>>>>> Regards
>>>>>>> Sumit Chawla
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Suggestion for Writing Sink Implementation

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Sumit,

I created a PR containing Cassandra IO with a sink:

https://github.com/apache/incubator-beam/pull/592

Maybe it can help you.

Regards
JB

On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> Hi Kenneth
>
> Thanks for looking into it. I am currently trying to implement Sinks for
> writing data into Cassandra/Titan DB.  My immediate goal is to run it on
> Flink Runner.
>
>
>
> Regards
> Sumit Chawla
>
>
> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
>> Hi Sumit,
>>
>> I see what has happened here, from that snippet you pasted from the Flink
>> runner's code [1]. Thanks for looking into it!
>>
>> The Flink runner today appears to reject Write.Bounded transforms in
>> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
>> intent of that code, I believe, was to special case UnboundedFlinkSink to
>> make it easy to use an existing Flink sink, not to disable all other Write
>> transforms. What do you think, Max?
>>
>> Until we fix this issue, you should use ParDo transforms to do the writing.
>> If you can share a little about your sink, we may be able to suggest
>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
>> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>>
>> Kenn
>>
>> [1]
>>
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>>
>>
>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
>> kirpichov@google.com.invalid> wrote:
>>
>>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>>> runner, and I'll then defer to somebody familiar with it.
>>>
>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
>>> wrote:
>>>
>>>> Thanks a lot Eugene.
>>>>
>>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>> mandates that my implementation must also implement SinkFunction<>.  In
>>>> that >>>case, none of the Sink<> methods get called anyway.
>>>>
>>>> I am using FlinkRunner. The Sink implementation that i was writing by
>>>> extending Sink<> class had to implement Flink Specific SinkFunction for
>>> the
>>>> correct translation.
>>>>
>>>> private static class WriteSinkStreamingTranslator<T> implements
>>>>
>>>
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>>>> {
>>>>
>>>>    @Override
>>>>    public void translateNode(Write.Bound<T> transform,
>>>> FlinkStreamingTranslationContext context) {
>>>>      String name = transform.getName();
>>>>      PValue input = context.getInput(transform);
>>>>
>>>>      Sink<T> sink = transform.getSink();
>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
>>>>        throw new UnsupportedOperationException("At the time, only
>>>> unbounded Flink sinks are supported.");
>>>>      }
>>>>
>>>>      DataStream<WindowedValue<T>> inputDataSet =
>>>> context.getInputDataStream(input);
>>>>
>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
>> Object>()
>>> {
>>>>        @Override
>>>>        public void flatMap(WindowedValue<T> value, Collector<Object>
>>>> out) throws Exception {
>>>>          out.collect(value.getValue());
>>>>        }
>>>>      }).addSink(((UnboundedFlinkSink<Object>)
>>>> sink).getFlinkSource()).name(name);
>>>>    }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>>>> kirpichov@google.com.invalid> wrote:
>>>>
>>>>> Hi Sumit,
>>>>>
>>>>> All reusable parts of a pipeline, including connectors to storage
>>>> systems,
>>>>> should be packaged as PTransform's.
>>>>>
>>>>> Sink is an advanced API that you can use under the hood to implement
>>> the
>>>>> transform, if this particular connector benefits from this API - but
>>> you
>>>>> don't have to, and many connectors indeed don't need it, and are
>>> simpler
>>>> to
>>>>> implement just as wrappers around a couple of ParDo's writing the
>> data.
>>>>>
>>>>> Even if the connector is implemented using a Sink, packaging the
>>>> connector
>>>>> as a PTransform is important because it's easier to apply in a
>> pipeline
>>>> and
>>>>> because it's more future-proof (the author of the connector may later
>>>>> change it to use something else rather than Sink under the hood
>> without
>>>>> breaking existing users).
>>>>>
>>>>> Sink is, currently, useful in the following case:
>>>>> - You're writing a bounded amount of data (we do not yet have an
>>>> unbounded
>>>>> Sink analogue)
>>>>> - The location you're writing to is known at pipeline construction
>>> time,
>>>>> and does not depend on the data itself (support for "data-dependent"
>>>> sinks
>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>>>>> - The storage system you're writing to has a distinct
>> "initialization"
>>>> and
>>>>> "finalization" step, allowing the write operation to appear atomic
>>>> (either
>>>>> all data is written or none). This mostly applies to files (where
>>> writing
>>>>> is done by first writing to a temporary directory, and then renaming
>>> all
>>>>> files to their final location), but there can be other cases too.
>>>>>
>>>>> Here's an example GCP connector using the Sink API under the hood:
>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
>>>> DatastoreIO,
>>>>> BigtableIO etc.)
>>>>>
>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
>> your
>>>>> last paragraph: the Beam programming model is intentionally
>>>>> runner-agnostic, so that you can run exactly the same code on
>> different
>>>>> runners.
>>>>>
>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkchawla@gmail.com
>>>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Please suggest me on what is the best way to write a Sink in
>> Beam.  I
>>>> see
>>>>>> that there is a Sink<T> abstract class which is in experimental
>>> state.
>>>>>> What is the expected outcome of this one? Do we have the api
>> frozen,
>>> or
>>>>>> this could still change?  Most of the existing Sink implementations
>>>> like
>>>>>> KafkaIO.Write are not using this interface, and instead extends
>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
>>>>> extend
>>>>>> Sink<>.
>>>>>>
>>>>>>
>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
>>>>> mandates
>>>>>> that my implementation must also implement SinkFunction<>.  In that
>>>> case,
>>>>>> none of the Sink<> methods get called anyway.
>>>>>>
>>>>>> Regards
>>>>>> Sumit Chawla
>>>>>>
>>>>>
>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Hi Kenneth

Thanks for looking into it. I am currently trying to implement Sinks for
writing data into Cassandra/Titan DB.  My immediate goal is to run it on
Flink Runner.



Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <kl...@google.com.invalid>
wrote:

> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
> > Thanks Sumit. Looks like your question is, indeed, specific to the Flink
> > runner, and I'll then defer to somebody familiar with it.
> >
> > On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Thanks a lot Eugene.
> > >
> > > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> > > mandates that my implementation must also implement SinkFunction<>.  In
> > > that >>>case, none of the Sink<> methods get called anyway.
> > >
> > > I am using FlinkRunner. The Sink implementation that i was writing by
> > > extending Sink<> class had to implement Flink Specific SinkFunction for
> > the
> > > correct translation.
> > >
> > > private static class WriteSinkStreamingTranslator<T> implements
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > {
> > >
> > >   @Override
> > >   public void translateNode(Write.Bound<T> transform,
> > > FlinkStreamingTranslationContext context) {
> > >     String name = transform.getName();
> > >     PValue input = context.getInput(transform);
> > >
> > >     Sink<T> sink = transform.getSink();
> > >     if (!(sink instanceof UnboundedFlinkSink)) {
> > >       throw new UnsupportedOperationException("At the time, only
> > > unbounded Flink sinks are supported.");
> > >     }
> > >
> > >     DataStream<WindowedValue<T>> inputDataSet =
> > > context.getInputDataStream(input);
> > >
> > >     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> Object>()
> > {
> > >       @Override
> > >       public void flatMap(WindowedValue<T> value, Collector<Object>
> > > out) throws Exception {
> > >         out.collect(value.getValue());
> > >       }
> > >     }).addSink(((UnboundedFlinkSink<Object>)
> > > sink).getFlinkSource()).name(name);
> > >   }
> > > }
> > >
> > >
> > >
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > kirpichov@google.com.invalid> wrote:
> > >
> > > > Hi Sumit,
> > > >
> > > > All reusable parts of a pipeline, including connectors to storage
> > > systems,
> > > > should be packaged as PTransform's.
> > > >
> > > > Sink is an advanced API that you can use under the hood to implement
> > the
> > > > transform, if this particular connector benefits from this API - but
> > you
> > > > don't have to, and many connectors indeed don't need it, and are
> > simpler
> > > to
> > > > implement just as wrappers around a couple of ParDo's writing the
> data.
> > > >
> > > > Even if the connector is implemented using a Sink, packaging the
> > > connector
> > > > as a PTransform is important because it's easier to apply in a
> pipeline
> > > and
> > > > because it's more future-proof (the author of the connector may later
> > > > change it to use something else rather than Sink under the hood
> without
> > > > breaking existing users).
> > > >
> > > > Sink is, currently, useful in the following case:
> > > > - You're writing a bounded amount of data (we do not yet have an
> > > unbounded
> > > > Sink analogue)
> > > > - The location you're writing to is known at pipeline construction
> > time,
> > > > and does not depend on the data itself (support for "data-dependent"
> > > sinks
> > > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > > > - The storage system you're writing to has a distinct
> "initialization"
> > > and
> > > > "finalization" step, allowing the write operation to appear atomic
> > > (either
> > > > all data is written or none). This mostly applies to files (where
> > writing
> > > > is done by first writing to a temporary directory, and then renaming
> > all
> > > > files to their final location), but there can be other cases too.
> > > >
> > > > Here's an example GCP connector using the Sink API under the hood:
> > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > > Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > DatastoreIO,
> > > > BigtableIO etc.)
> > > >
> > > > I'm not familiar with the Flink API, however I'm a bit confused by
> your
> > > > last paragraph: the Beam programming model is intentionally
> > > > runner-agnostic, so that you can run exactly the same code on
> different
> > > > runners.
> > > >
> > > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkchawla@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > Please suggest me on what is the best way to write a Sink in
> Beam.  I
> > > see
> > > > > that there is a Sink<T> abstract class which is in experimental
> > state.
> > > > > What is the expected outcome of this one? Do we have the api
> frozen,
> > or
> > > > > this could still change?  Most of the existing Sink implementations
> > > like
> > > > > KafkaIO.Write are not using this interface, and instead extends
> > > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> > > > extend
> > > > > Sink<>.
> > > > >
> > > > >
> > > > > My immediate requirement is to run this Sink on FlinkRunner. Which
> > > > mandates
> > > > > that my implementation must also implement SinkFunction<>.  In that
> > > case,
> > > > > none of the Sink<> methods get called anyway.
> > > > >
> > > > > Regards
> > > > > Sumit Chawla
> > > > >
> > > >
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
Hi Sumit,

I see what has happened here, from that snippet you pasted from the Flink
runner's code [1]. Thanks for looking into it!

The Flink runner today appears to reject Write.Bounded transforms in
streaming mode if the sink is not an instance of UnboundedFlinkSink. The
intent of that code, I believe, was to special case UnboundedFlinkSink to
make it easy to use an existing Flink sink, not to disable all other Write
transforms. What do you think, Max?

Until we fix this issue, you should use ParDo transforms to do the writing.
If you can share a little about your sink, we may be able to suggest
patterns for implementing it. Like Eugene said, the Write.of(Sink)
transform is just a specialized pattern of ParDo's, not a Beam primitive.

Kenn

[1]
https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203


On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
kirpichov@google.com.invalid> wrote:

> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
> runner, and I'll then defer to somebody familiar with it.
>
> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Thanks a lot Eugene.
> >
> > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> > mandates that my implementation must also implement SinkFunction<>.  In
> > that >>>case, none of the Sink<> methods get called anyway.
> >
> > I am using FlinkRunner. The Sink implementation that i was writing by
> > extending Sink<> class had to implement Flink Specific SinkFunction for
> the
> > correct translation.
> >
> > private static class WriteSinkStreamingTranslator<T> implements
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > {
> >
> >   @Override
> >   public void translateNode(Write.Bound<T> transform,
> > FlinkStreamingTranslationContext context) {
> >     String name = transform.getName();
> >     PValue input = context.getInput(transform);
> >
> >     Sink<T> sink = transform.getSink();
> >     if (!(sink instanceof UnboundedFlinkSink)) {
> >       throw new UnsupportedOperationException("At the time, only
> > unbounded Flink sinks are supported.");
> >     }
> >
> >     DataStream<WindowedValue<T>> inputDataSet =
> > context.getInputDataStream(input);
> >
> >     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>()
> {
> >       @Override
> >       public void flatMap(WindowedValue<T> value, Collector<Object>
> > out) throws Exception {
> >         out.collect(value.getValue());
> >       }
> >     }).addSink(((UnboundedFlinkSink<Object>)
> > sink).getFlinkSource()).name(name);
> >   }
> > }
> >
> >
> >
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > kirpichov@google.com.invalid> wrote:
> >
> > > Hi Sumit,
> > >
> > > All reusable parts of a pipeline, including connectors to storage
> > systems,
> > > should be packaged as PTransform's.
> > >
> > > Sink is an advanced API that you can use under the hood to implement
> the
> > > transform, if this particular connector benefits from this API - but
> you
> > > don't have to, and many connectors indeed don't need it, and are
> simpler
> > to
> > > implement just as wrappers around a couple of ParDo's writing the data.
> > >
> > > Even if the connector is implemented using a Sink, packaging the
> > connector
> > > as a PTransform is important because it's easier to apply in a pipeline
> > and
> > > because it's more future-proof (the author of the connector may later
> > > change it to use something else rather than Sink under the hood without
> > > breaking existing users).
> > >
> > > Sink is, currently, useful in the following case:
> > > - You're writing a bounded amount of data (we do not yet have an
> > unbounded
> > > Sink analogue)
> > > - The location you're writing to is known at pipeline construction
> time,
> > > and does not depend on the data itself (support for "data-dependent"
> > sinks
> > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > > - The storage system you're writing to has a distinct "initialization"
> > and
> > > "finalization" step, allowing the write operation to appear atomic
> > (either
> > > all data is written or none). This mostly applies to files (where
> writing
> > > is done by first writing to a temporary directory, and then renaming
> all
> > > files to their final location), but there can be other cases too.
> > >
> > > Here's an example GCP connector using the Sink API under the hood:
> > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > Most other non-file-based connectors, indeed, don't (KafkaIO,
> > DatastoreIO,
> > > BigtableIO etc.)
> > >
> > > I'm not familiar with the Flink API, however I'm a bit confused by your
> > > last paragraph: the Beam programming model is intentionally
> > > runner-agnostic, so that you can run exactly the same code on different
> > > runners.
> > >
> > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com>
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > Please suggest me on what is the best way to write a Sink in Beam.  I
> > see
> > > > that there is a Sink<T> abstract class which is in experimental
> state.
> > > > What is the expected outcome of this one? Do we have the api frozen,
> or
> > > > this could still change?  Most of the existing Sink implementations
> > like
> > > > KafkaIO.Write are not using this interface, and instead extends
> > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> > > extend
> > > > Sink<>.
> > > >
> > > >
> > > > My immediate requirement is to run this Sink on FlinkRunner. Which
> > > mandates
> > > > that my implementation must also implement SinkFunction<>.  In that
> > case,
> > > > none of the Sink<> methods get called anyway.
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Thanks Sumit. Looks like your question is, indeed, specific to the Flink
runner, and I'll then defer to somebody familiar with it.

On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com> wrote:

> Thanks a lot Eugene.
>
> >>>My immediate requirement is to run this Sink on FlinkRunner. Which
> mandates that my implementation must also implement SinkFunction<>.  In
> that >>>case, none of the Sink<> methods get called anyway.
>
> I am using FlinkRunner. The Sink implementation that i was writing by
> extending Sink<> class had to implement Flink Specific SinkFunction for the
> correct translation.
>
> private static class WriteSinkStreamingTranslator<T> implements
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> {
>
>   @Override
>   public void translateNode(Write.Bound<T> transform,
> FlinkStreamingTranslationContext context) {
>     String name = transform.getName();
>     PValue input = context.getInput(transform);
>
>     Sink<T> sink = transform.getSink();
>     if (!(sink instanceof UnboundedFlinkSink)) {
>       throw new UnsupportedOperationException("At the time, only
> unbounded Flink sinks are supported.");
>     }
>
>     DataStream<WindowedValue<T>> inputDataSet =
> context.getInputDataStream(input);
>
>     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
>       @Override
>       public void flatMap(WindowedValue<T> value, Collector<Object>
> out) throws Exception {
>         out.collect(value.getValue());
>       }
>     }).addSink(((UnboundedFlinkSink<Object>)
> sink).getFlinkSource()).name(name);
>   }
> }
>
>
>
>
> Regards
> Sumit Chawla
>
>
> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
> > Hi Sumit,
> >
> > All reusable parts of a pipeline, including connectors to storage
> systems,
> > should be packaged as PTransform's.
> >
> > Sink is an advanced API that you can use under the hood to implement the
> > transform, if this particular connector benefits from this API - but you
> > don't have to, and many connectors indeed don't need it, and are simpler
> to
> > implement just as wrappers around a couple of ParDo's writing the data.
> >
> > Even if the connector is implemented using a Sink, packaging the
> connector
> > as a PTransform is important because it's easier to apply in a pipeline
> and
> > because it's more future-proof (the author of the connector may later
> > change it to use something else rather than Sink under the hood without
> > breaking existing users).
> >
> > Sink is, currently, useful in the following case:
> > - You're writing a bounded amount of data (we do not yet have an
> unbounded
> > Sink analogue)
> > - The location you're writing to is known at pipeline construction time,
> > and does not depend on the data itself (support for "data-dependent"
> sinks
> > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > - The storage system you're writing to has a distinct "initialization"
> and
> > "finalization" step, allowing the write operation to appear atomic
> (either
> > all data is written or none). This mostly applies to files (where writing
> > is done by first writing to a temporary directory, and then renaming all
> > files to their final location), but there can be other cases too.
> >
> > Here's an example GCP connector using the Sink API under the hood:
> >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > Most other non-file-based connectors, indeed, don't (KafkaIO,
> DatastoreIO,
> > BigtableIO etc.)
> >
> > I'm not familiar with the Flink API, however I'm a bit confused by your
> > last paragraph: the Beam programming model is intentionally
> > runner-agnostic, so that you can run exactly the same code on different
> > runners.
> >
> > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com>
> > wrote:
> >
> > > Hi
> > >
> > > Please suggest me on what is the best way to write a Sink in Beam.  I
> see
> > > that there is a Sink<T> abstract class which is in experimental state.
> > > What is the expected outcome of this one? Do we have the api frozen, or
> > > this could still change?  Most of the existing Sink implementations
> like
> > > KafkaIO.Write are not using this interface, and instead extends
> > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> > extend
> > > Sink<>.
> > >
> > >
> > > My immediate requirement is to run this Sink on FlinkRunner. Which
> > mandates
> > > that my implementation must also implement SinkFunction<>.  In that
> case,
> > > none of the Sink<> methods get called anyway.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by "Chawla,Sumit " <su...@gmail.com>.
Thanks a lot Eugene.

>>>My immediate requirement is to run this Sink on FlinkRunner. Which
mandates that my implementation must also implement SinkFunction<>.  In
that >>>case, none of the Sink<> methods get called anyway.

I am using FlinkRunner. The Sink implementation that i was writing by
extending Sink<> class had to implement Flink Specific SinkFunction for the
correct translation.

private static class WriteSinkStreamingTranslator<T> implements
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
{

  @Override
  public void translateNode(Write.Bound<T> transform,
FlinkStreamingTranslationContext context) {
    String name = transform.getName();
    PValue input = context.getInput(transform);

    Sink<T> sink = transform.getSink();
    if (!(sink instanceof UnboundedFlinkSink)) {
      throw new UnsupportedOperationException("At the time, only
unbounded Flink sinks are supported.");
    }

    DataStream<WindowedValue<T>> inputDataSet =
context.getInputDataStream(input);

    inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
      @Override
      public void flatMap(WindowedValue<T> value, Collector<Object>
out) throws Exception {
        out.collect(value.getValue());
      }
    }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
  }
}




Regards
Sumit Chawla


On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
kirpichov@google.com.invalid> wrote:

> Hi Sumit,
>
> All reusable parts of a pipeline, including connectors to storage systems,
> should be packaged as PTransform's.
>
> Sink is an advanced API that you can use under the hood to implement the
> transform, if this particular connector benefits from this API - but you
> don't have to, and many connectors indeed don't need it, and are simpler to
> implement just as wrappers around a couple of ParDo's writing the data.
>
> Even if the connector is implemented using a Sink, packaging the connector
> as a PTransform is important because it's easier to apply in a pipeline and
> because it's more future-proof (the author of the connector may later
> change it to use something else rather than Sink under the hood without
> breaking existing users).
>
> Sink is, currently, useful in the following case:
> - You're writing a bounded amount of data (we do not yet have an unbounded
> Sink analogue)
> - The location you're writing to is known at pipeline construction time,
> and does not depend on the data itself (support for "data-dependent" sinks
> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> - The storage system you're writing to has a distinct "initialization" and
> "finalization" step, allowing the write operation to appear atomic (either
> all data is written or none). This mostly applies to files (where writing
> is done by first writing to a temporary directory, and then renaming all
> files to their final location), but there can be other cases too.
>
> Here's an example GCP connector using the Sink API under the hood:
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> Most other non-file-based connectors, indeed, don't (KafkaIO, DatastoreIO,
> BigtableIO etc.)
>
> I'm not familiar with the Flink API, however I'm a bit confused by your
> last paragraph: the Beam programming model is intentionally
> runner-agnostic, so that you can run exactly the same code on different
> runners.
>
> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com>
> wrote:
>
> > Hi
> >
> > Please suggest me on what is the best way to write a Sink in Beam.  I see
> > that there is a Sink<T> abstract class which is in experimental state.
> > What is the expected outcome of this one? Do we have the api frozen, or
> > this could still change?  Most of the existing Sink implementations like
> > KafkaIO.Write are not using this interface, and instead extends
> > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
> extend
> > Sink<>.
> >
> >
> > My immediate requirement is to run this Sink on FlinkRunner. Which
> mandates
> > that my implementation must also implement SinkFunction<>.  In that case,
> > none of the Sink<> methods get called anyway.
> >
> > Regards
> > Sumit Chawla
> >
>

Re: Suggestion for Writing Sink Implementation

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi Sumit,

All reusable parts of a pipeline, including connectors to storage systems,
should be packaged as PTransform's.

Sink is an advanced API that you can use under the hood to implement the
transform, if this particular connector benefits from this API - but you
don't have to, and many connectors indeed don't need it, and are simpler to
implement just as wrappers around a couple of ParDo's writing the data.

Even if the connector is implemented using a Sink, packaging the connector
as a PTransform is important because it's easier to apply in a pipeline and
because it's more future-proof (the author of the connector may later
change it to use something else rather than Sink under the hood without
breaking existing users).

Sink is, currently, useful in the following case:
- You're writing a bounded amount of data (we do not yet have an unbounded
Sink analogue)
- The location you're writing to is known at pipeline construction time,
and does not depend on the data itself (support for "data-dependent" sinks
is on the radar https://issues.apache.org/jira/browse/BEAM-92)
- The storage system you're writing to has a distinct "initialization" and
"finalization" step, allowing the write operation to appear atomic (either
all data is written or none). This mostly applies to files (where writing
is done by first writing to a temporary directory, and then renaming all
files to their final location), but there can be other cases too.

Here's an example GCP connector using the Sink API under the hood:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
Most other non-file-based connectors, indeed, don't (KafkaIO, DatastoreIO,
BigtableIO etc.)

I'm not familiar with the Flink API, however I'm a bit confused by your
last paragraph: the Beam programming model is intentionally
runner-agnostic, so that you can run exactly the same code on different
runners.

On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com> wrote:

> Hi
>
> Please suggest me on what is the best way to write a Sink in Beam.  I see
> that there is a Sink<T> abstract class which is in experimental state.
> What is the expected outcome of this one? Do we have the api frozen, or
> this could still change?  Most of the existing Sink implementations like
> KafkaIO.Write are not using this interface, and instead extends
> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to extend
> Sink<>.
>
>
> My immediate requirement is to run this Sink on FlinkRunner. Which mandates
> that my implementation must also implement SinkFunction<>.  In that case,
> none of the Sink<> methods get called anyway.
>
> Regards
> Sumit Chawla
>