You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2016/08/17 16:34:49 UTC

Re: Suggestion for Writing Sink Implementation

Hi Kenneth,

The problem is that the Write transform is not supported in streaming
execution of the Flink Runner because the streaming execution doesn't
currently support side inputs. PR is open to fix that..

Cheers,
Max

On Thu, Jul 28, 2016 at 8:56 PM, Kenneth Knowles <kl...@google.com.invalid> wrote:
> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>> runner, and I'll then defer to somebody familiar with it.
>>
>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <su...@gmail.com>
>> wrote:
>>
>> > Thanks a lot Eugene.
>> >
>> > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
>> > mandates that my implementation must also implement SinkFunction<>.  In
>> > that >>>case, none of the Sink<> methods get called anyway.
>> >
>> > I am using FlinkRunner. The Sink implementation that i was writing by
>> > extending Sink<> class had to implement Flink Specific SinkFunction for
>> the
>> > correct translation.
>> >
>> > private static class WriteSinkStreamingTranslator<T> implements
>> >
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
>> > {
>> >
>> >   @Override
>> >   public void translateNode(Write.Bound<T> transform,
>> > FlinkStreamingTranslationContext context) {
>> >     String name = transform.getName();
>> >     PValue input = context.getInput(transform);
>> >
>> >     Sink<T> sink = transform.getSink();
>> >     if (!(sink instanceof UnboundedFlinkSink)) {
>> >       throw new UnsupportedOperationException("At the time, only
>> > unbounded Flink sinks are supported.");
>> >     }
>> >
>> >     DataStream<WindowedValue<T>> inputDataSet =
>> > context.getInputDataStream(input);
>> >
>> >     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>()
>> {
>> >       @Override
>> >       public void flatMap(WindowedValue<T> value, Collector<Object>
>> > out) throws Exception {
>> >         out.collect(value.getValue());
>> >       }
>> >     }).addSink(((UnboundedFlinkSink<Object>)
>> > sink).getFlinkSource()).name(name);
>> >   }
>> > }
>> >
>> >
>> >
>> >
>> > Regards
>> > Sumit Chawla
>> >
>> >
>> > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>> > kirpichov@google.com.invalid> wrote:
>> >
>> > > Hi Sumit,
>> > >
>> > > All reusable parts of a pipeline, including connectors to storage
>> > systems,
>> > > should be packaged as PTransform's.
>> > >
>> > > Sink is an advanced API that you can use under the hood to implement
>> the
>> > > transform, if this particular connector benefits from this API - but
>> you
>> > > don't have to, and many connectors indeed don't need it, and are
>> simpler
>> > to
>> > > implement just as wrappers around a couple of ParDo's writing the data.
>> > >
>> > > Even if the connector is implemented using a Sink, packaging the
>> > connector
>> > > as a PTransform is important because it's easier to apply in a pipeline
>> > and
>> > > because it's more future-proof (the author of the connector may later
>> > > change it to use something else rather than Sink under the hood without
>> > > breaking existing users).
>> > >
>> > > Sink is, currently, useful in the following case:
>> > > - You're writing a bounded amount of data (we do not yet have an
>> > unbounded
>> > > Sink analogue)
>> > > - The location you're writing to is known at pipeline construction
>> time,
>> > > and does not depend on the data itself (support for "data-dependent"
>> > sinks
>> > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>> > > - The storage system you're writing to has a distinct "initialization"
>> > and
>> > > "finalization" step, allowing the write operation to appear atomic
>> > (either
>> > > all data is written or none). This mostly applies to files (where
>> writing
>> > > is done by first writing to a temporary directory, and then renaming
>> all
>> > > files to their final location), but there can be other cases too.
>> > >
>> > > Here's an example GCP connector using the Sink API under the hood:
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
>> > > Most other non-file-based connectors, indeed, don't (KafkaIO,
>> > DatastoreIO,
>> > > BigtableIO etc.)
>> > >
>> > > I'm not familiar with the Flink API, however I'm a bit confused by your
>> > > last paragraph: the Beam programming model is intentionally
>> > > runner-agnostic, so that you can run exactly the same code on different
>> > > runners.
>> > >
>> > > On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <su...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi
>> > > >
>> > > > Please suggest me on what is the best way to write a Sink in Beam.  I
>> > see
>> > > > that there is a Sink<T> abstract class which is in experimental
>> state.
>> > > > What is the expected outcome of this one? Do we have the api frozen,
>> or
>> > > > this could still change?  Most of the existing Sink implementations
>> > like
>> > > > KafkaIO.Write are not using this interface, and instead extends
>> > > > PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
>> > > extend
>> > > > Sink<>.
>> > > >
>> > > >
>> > > > My immediate requirement is to run this Sink on FlinkRunner. Which
>> > > mandates
>> > > > that my implementation must also implement SinkFunction<>.  In that
>> > case,
>> > > > none of the Sink<> methods get called anyway.
>> > > >
>> > > > Regards
>> > > > Sumit Chawla
>> > > >
>> > >
>> >
>>