You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2018/10/24 07:27:49 UTC

Re: Unbalanced FileIO writes on Flink

cc (dev)

I tried to run the example with FlinkRunner in batch mode and received
again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above example,
pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten had
incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(10000)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour)))),
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jo...@gmail.com> wrote:

> Hi Max,
>
> I forgot to mention that example is run in streaming mode, therefore I can
> not do writes without specifying shards. FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is only one I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Hi Jozef,
>>
>> This does not look like a FlinkRunner related problem, but is caused by
>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>> which apparently does not lead to good data spread in your case.
>>
>> Do you see the same behavior without `withNumShards(5)`?
>>
>> Thanks,
>> Max
>>
>> On 22.10.18 11:57, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > I am having some trouble to get a balanced write via FileIO. Workers at
>> > the shuffle side where data per window fire are written to the
>> > filesystem receive unbalanced number of events.
>> >
>> > Here is a naive code example:
>> >
>> >      val read = KafkaIO.read()
>> >          .withTopic("topic")
>> >          .withBootstrapServers("kafka1:9092")
>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >          .withProcessingTime()
>> >
>> >      pipeline
>> >          .apply(read)
>> >          .apply(MapElements.via(new
>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>> >            override def apply(input: KafkaRecord[Array[Byte],
>> > Array[Byte]]): String = {
>> >              new String(input.getKV.getValue, "UTF-8")
>> >            }
>> >          }))
>> >
>> >
>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >              .triggering(AfterWatermark.pastEndOfWindow()
>> >                  .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >
>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >
>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >
>> >
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >              .discardingFiredPanes()
>> >              .withAllowedLateness(Duration.standardDays(7)))
>> >
>> >          .apply(FileIO.write()
>> >              .via(TextIO.sink())
>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>> >              .withTempDirectory(tempLocation)
>> >              .withNumShards(5))
>> >
>> >
>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>> > number of shards), I would expect that each worker will participate on
>> > persisting shards and equally, since code uses fixed number of shards
>> > (and random shard assign?). But reality is different (see 2
>> attachements
>> > - statistiscs from flink task reading from kafka and task writing to
>> files)
>> >
>> > What am I missing? How to achieve balanced writes?
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
> I was suggesting a transform like reshuffle that can avoid the actual reshuffle if the data is already well distributed

How do we know if the data is already well-distributed? Can't we simply 
give the user control over the shuffling behavior?

> and also provides some kind of unique key

Yes, that what I meant with the "subtask index" in Flink.

> I don't recall why we made the choice of shard counts required in streaming mode. Perhaps because the bundles were to small (per key?) by default and we wanted to force more grouping?

The issue https://issues.apache.org/jira/browse/BEAM-1438 mentions too 
many files as the reason.


-Max

On 26.10.18 15:44, Robert Bradshaw wrote:
> We can't use Reshuffle for this, as there may be other reasons the
> user wants to actually force a reshuffle, but I was suggesting a
> transform like reshuffle that can avoid the actual reshuffle if the
> data is already well distributed, and also provides some kind of
> unique key (though perhaps just choosing a random nonce in
> start_bundle would be sufficient).
> 
> For sinks where we may need to retry writes, Reshuffle has been
> (ab)used to provide stable inputs, but for file-based sinks, this does
> not seem necessary. I don't recall why we made the choice of shard
> counts required in streaming mode. Perhaps because the bundles were to
> small (per key?) by default and we wanted to force more grouping?
> 
> On Fri, Oct 26, 2018 at 3:32 PM Maximilian Michels <mx...@apache.org> wrote:
>>
>> Actually, I don't think setting the number of shards by the Runner will
>> solve the problem. The shuffling logic still remains. And, as observed
>> by Jozef, it doesn't necessarily lead to balanced shards.
>>
>> The sharding logic of the Beam IO is handy but it shouldn't be strictly
>> necessary when the data is already partitioned nicely.
>>
>> It seems the sharding logic is primarily necessary because there is no
>> notion of a worker's ID in Beam. In Flink, you can retrieve the worker
>> ID at runtime and every worker just directly writes its results to a
>> file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.
>>
>> Robert, don't we already have Reshuffle which can be overriden? However,
>> it is not used by the WritesFiles code.
>>
>>
>> -Max
>>
>> On 26.10.18 11:41, Robert Bradshaw wrote:
>>> I think it's worth adding a URN for the operation of distributing
>>> "evenly" into an "appropriate" number of shards. A naive implementation
>>> would add random keys and to a ReshufflePerKey, but runners could
>>> override this to do a reshuffle and then key by whatever notion of
>>> bundle/worker/shard identifier they have that lines up with the number
>>> of actual workers.
>>>
>>> On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>> <ma...@gmail.com>> wrote:
>>>
>>>      Thanks for the JIRA. If I understand it correctly ... so runner
>>>      determined sharding will avoid extra shuffle? Will it just write
>>>      worker local available data to it's shard? Something similar to
>>>      coalesce in Spark?
>>>
>>>      On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
>>>      <ma...@apache.org>> wrote:
>>>
>>>          Oh ok, thanks for the pointer. Coming from Flink, the default is
>>>          that
>>>          the sharding is determined by the runtime distribution. Indeed,
>>>          we will
>>>          have to add an overwrite to the Flink Runner, similar to this one:
>>>
>>>          https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>>>
>>>          Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>>>
>>>          Thanks,
>>>          Max
>>>
>>>          On 25.10.18 22:37, Reuven Lax wrote:
>>>           > FYI the Dataflow runner automatically sets the default number
>>>          of shards
>>>           > (I believe to be 2 * num_workers). Probably we should do
>>>          something
>>>           > similar for the Flink runner.
>>>           >
>>>           > This needs to be done by the runner, as # of workers is a runner
>>>           > concept; the SDK itself has no concept of workers.
>>>           >
>>>           > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
>>>          <jozo.vilcek@gmail.com <ma...@gmail.com>
>>>           > <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>> wrote:
>>>           >
>>>           >     If I do not specify shards for unbounded collection, I get
>>>           >
>>>           >     Caused by: java.lang.IllegalArgumentException: When applying
>>>           >     WriteFiles to an unbounded PCollection, must specify
>>>          number of
>>>           >     output shards explicitly
>>>           >              at
>>>           >     org.apache.beam.repackaged.beam_sdks_java_core.com
>>>          <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>>>           >              at
>>>           > org.apache.beam.sdk.io
>>>          <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
>>>           >
>>>           >     Around same lines in WriteFiles is also a check for
>>>          windowed writes.
>>>           >     I believe FileIO enables it explicitly when windowing is
>>>          present. In
>>>           >     filesystem written files are per window and shard.
>>>           >
>>>           >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
>>>          <mxm@apache.org <ma...@apache.org>
>>>           >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>>>           >
>>>           >         I agree it would be nice to keep the current
>>>          distribution of
>>>           >         elements
>>>           >         instead of doing a shuffle based on an artificial
>>>          shard key.
>>>           >
>>>           >         Have you tried `withWindowedWrites()`? Also, why do
>>>          you say you
>>>           >         need to
>>>           >         specify the number of shards in streaming mode?
>>>           >
>>>           >         -Max
>>>           >
>>>           >         On 25.10.18 10:12, Jozef Vilcek wrote:
>>>           >          > Hm, yes, this makes sense now, but what can be
>>>          done for my
>>>           >         case? I do
>>>           >          > not want to end up with too many files on disk.
>>>           >          >
>>>           >          > I think what I am looking for is to instruct IO
>>>          that do not
>>>           >         do again
>>>           >          > random shard and reshuffle but just assume number
>>>          of shards
>>>           >         equal to
>>>           >          > number of workers and shard ID is a worker ID.
>>>           >          > Is this doable in beam model?
>>>           >          >
>>>           >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>>>           >         <mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>
>>>           >          > <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>>>           >          >
>>>           >          >     The FlinkRunner uses a hash function
>>>          (MurmurHash) on each
>>>           >         key which
>>>           >          >     places keys somewhere in the hash space. The
>>>          hash space
>>>           >         (2^32) is split
>>>           >          >     among the partitions (5 in your case). Given
>>>          enough keys,
>>>           >         the chance
>>>           >          >     increases they are equally spread.
>>>           >          >
>>>           >          >     This should be similar to what the other
>>>          Runners do.
>>>           >          >
>>>           >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>>>           >          >      >
>>>           >          >      > So if I run 5 workers with 50 shards, I end
>>>          up with:
>>>           >          >      >
>>>           >          >      > DurationBytes receivedRecords received
>>>           >          >      >   2m 39s        900 MB            465,525
>>>           >          >      >   2m 39s       1.76 GB            930,720
>>>           >          >      >   2m 39s        789 MB            407,315
>>>           >          >      >   2m 39s       1.32 GB            698,262
>>>           >          >      >   2m 39s        788 MB            407,310
>>>           >          >      >
>>>           >          >      > Still not good but better than with 5
>>>          shards where
>>>           >         some workers
>>>           >          >     did not
>>>           >          >      > participate at all.
>>>           >          >      > So, problem is in some layer which
>>>          distributes keys /
>>>           >         shards
>>>           >          >     among workers?
>>>           >          >      >
>>>           >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>>>           >         <relax@google.com <ma...@google.com>
>>>          <mailto:relax@google.com <ma...@google.com>>
>>>           >          >     <mailto:relax@google.com
>>>          <ma...@google.com> <mailto:relax@google.com
>>>          <ma...@google.com>>>
>>>           >          >      > <mailto:relax@google.com
>>>          <ma...@google.com> <mailto:relax@google.com
>>>          <ma...@google.com>>
>>>           >         <mailto:relax@google.com <ma...@google.com>
>>>          <mailto:relax@google.com <ma...@google.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >     withNumShards(5) generates 5 random
>>>          shards. It
>>>           >         turns out that
>>>           >          >      >     statistically when you generate 5
>>>          random shards
>>>           >         and you have 5
>>>           >          >      >     works, the probability is reasonably
>>>          high that
>>>           >         some workers
>>>           >          >     will get
>>>           >          >      >     more than one shard (and as a result
>>>          not all
>>>           >         workers will
>>>           >          >      >     participate). Are you able to set the
>>>          number of
>>>           >         shards larger
>>>           >          >     than 5?
>>>           >          >      >
>>>           >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
>>>          Vilcek
>>>           >          >     <jozo.vilcek@gmail.com
>>>          <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>
>>>           >          >      >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>
>>>           >          >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >         cc (dev)
>>>           >          >      >
>>>           >          >      >         I tried to run the example with
>>>          FlinkRunner in
>>>           >         batch mode and
>>>           >          >      >         received again bad data spread
>>>          among the workers.
>>>           >          >      >
>>>           >          >      >         When I tried to remove number of
>>>          shards for
>>>           >         batch mode in
>>>           >          >     above
>>>           >          >      >         example, pipeline crashed before launch
>>>           >          >      >
>>>           >          >      >         Caused by:
>>>          java.lang.IllegalStateException:
>>>           >         Inputs to Flatten
>>>           >          >      >         had incompatible triggers:
>>>           >          >      >
>>>           >          >
>>>           >
>>>            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>>>           >          >      >         entCountAtLeast(10000)),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>>>           >          >      >         hour)))),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>>>           >          >      >
>>>            rever(AfterPane.elementCountAtLeast(1)),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
>>>          Jozef Vilcek
>>>           >          >      >         <jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>
>>>           >          >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >             Hi Max,
>>>           >          >      >
>>>           >          >      >             I forgot to mention that
>>>          example is run in
>>>           >         streaming
>>>           >          >     mode,
>>>           >          >      >             therefore I can not do writes
>>>          without
>>>           >         specifying shards.
>>>           >          >      >             FileIO explicitly asks for them.
>>>           >          >      >
>>>           >          >      >             I am not sure where the problem is.
>>>           >         FlinkRunner is
>>>           >          >     only one
>>>           >          >      >             I used.
>>>           >          >      >
>>>           >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>>>           >         Maximilian Michels
>>>           >          >      >             <mxm@apache.org
>>>          <ma...@apache.org> <mailto:mxm@apache.org
>>>          <ma...@apache.org>>
>>>           >         <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>
>>>           >          >     <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>
>>>           >         <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
>>>           >          >      >
>>>           >          >      >                 Hi Jozef,
>>>           >          >      >
>>>           >          >      >                 This does not look like a
>>>          FlinkRunner
>>>           >         related
>>>           >          >     problem,
>>>           >          >      >                 but is caused by
>>>           >          >      >                 the `WriteFiles` sharding
>>>          logic. It
>>>           >         assigns keys and
>>>           >          >      >                 does a Reshuffle
>>>           >          >      >                 which apparently does not
>>>          lead to good
>>>           >         data spread in
>>>           >          >      >                 your case.
>>>           >          >      >
>>>           >          >      >                 Do you see the same
>>>          behavior without
>>>           >          >     `withNumShards(5)`?
>>>           >          >      >
>>>           >          >      >                 Thanks,
>>>           >          >      >                 Max
>>>           >          >      >
>>>           >          >      >                 On 22.10.18 11:57, Jozef
>>>          Vilcek wrote:
>>>           >          >      >                  > Hello,
>>>           >          >      >                  >
>>>           >          >      >                  > I am having some trouble
>>>          to get a
>>>           >         balanced
>>>           >          >     write via
>>>           >          >      >                 FileIO. Workers at
>>>           >          >      >                  > the shuffle side where
>>>          data per
>>>           >         window fire are
>>>           >          >      >                 written to the
>>>           >          >      >                  > filesystem receive
>>>          unbalanced
>>>           >         number of events.
>>>           >          >      >                  >
>>>           >          >      >                  > Here is a naive code
>>>          example:
>>>           >          >      >                  >
>>>           >          >      >                  >      val read =
>>>          KafkaIO.read()
>>>           >          >      >                  >          .withTopic("topic")
>>>           >          >      >                  >
>>>           >         .withBootstrapServers("kafka1:9092")
>>>           >          >      >                  >
>>>           >          >      >
>>>           >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>           >          >      >                  >
>>>          .withProcessingTime()
>>>           >          >      >                  >
>>>           >          >      >                  >      pipeline
>>>           >          >      >                  >          .apply(read)
>>>           >          >      >                  >
>>>          .apply(MapElements.via(new
>>>           >          >      >                  >
>>>          SimpleFunction[KafkaRecord[Array[Byte],
>>>           >          >     Array[Byte]],
>>>           >          >      >                 String]() {
>>>           >          >      >                  >            override def
>>>          apply(input:
>>>           >          >      >                 KafkaRecord[Array[Byte],
>>>           >          >      >                  > Array[Byte]]): String = {
>>>           >          >      >                  >              new
>>>           >         String(input.getKV.getValue,
>>>           >          >     "UTF-8")
>>>           >          >      >                  >            }
>>>           >          >      >                  >          }))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>            .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>           >          >      >                  >
>>>           >          >     .triggering(AfterWatermark.pastEndOfWindow()
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>>           >          >      >                  >
>>>          .discardingFiredPanes()
>>>           >          >      >                  >
>>>           >          >      >
>>>           >           .withAllowedLateness(Duration.standardDays(7)))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>          .apply(FileIO.write()
>>>           >          >      >                  >
>>>          .via(TextIO.sink())
>>>           >          >      >                  >              .withNaming(new
>>>           >          >      >                 SafeFileNaming(outputPath,
>>>          ".txt"))
>>>           >          >      >                  >
>>>           >         .withTempDirectory(tempLocation)
>>>           >          >      >                  >
>>>          .withNumShards(5))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >                  > If I run this on Beam
>>>          2.6.0 with
>>>           >         Flink 1.5.0 on 5
>>>           >          >      >                 workers (equal to
>>>           >          >      >                  > number of shards), I
>>>          would expect
>>>           >         that each worker
>>>           >          >      >                 will participate on
>>>           >          >      >                  > persisting shards and
>>>          equally,
>>>           >         since code uses
>>>           >          >     fixed
>>>           >          >      >                 number of shards
>>>           >          >      >                  > (and random shard
>>>          assign?). But
>>>           >         reality is
>>>           >          >     different
>>>           >          >      >                 (see 2 attachements
>>>           >          >      >                  > - statistiscs from flink
>>>          task
>>>           >         reading from
>>>           >          >     kafka and
>>>           >          >      >                 task writing to files)
>>>           >          >      >                  >
>>>           >          >      >                  > What am I missing? How
>>>          to achieve
>>>           >         balanced writes?
>>>           >          >      >                  >
>>>           >          >      >                  > Thanks,
>>>           >          >      >                  > Jozef
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
> I was suggesting a transform like reshuffle that can avoid the actual reshuffle if the data is already well distributed

How do we know if the data is already well-distributed? Can't we simply 
give the user control over the shuffling behavior?

> and also provides some kind of unique key

Yes, that what I meant with the "subtask index" in Flink.

> I don't recall why we made the choice of shard counts required in streaming mode. Perhaps because the bundles were to small (per key?) by default and we wanted to force more grouping?

The issue https://issues.apache.org/jira/browse/BEAM-1438 mentions too 
many files as the reason.


-Max

On 26.10.18 15:44, Robert Bradshaw wrote:
> We can't use Reshuffle for this, as there may be other reasons the
> user wants to actually force a reshuffle, but I was suggesting a
> transform like reshuffle that can avoid the actual reshuffle if the
> data is already well distributed, and also provides some kind of
> unique key (though perhaps just choosing a random nonce in
> start_bundle would be sufficient).
> 
> For sinks where we may need to retry writes, Reshuffle has been
> (ab)used to provide stable inputs, but for file-based sinks, this does
> not seem necessary. I don't recall why we made the choice of shard
> counts required in streaming mode. Perhaps because the bundles were to
> small (per key?) by default and we wanted to force more grouping?
> 
> On Fri, Oct 26, 2018 at 3:32 PM Maximilian Michels <mx...@apache.org> wrote:
>>
>> Actually, I don't think setting the number of shards by the Runner will
>> solve the problem. The shuffling logic still remains. And, as observed
>> by Jozef, it doesn't necessarily lead to balanced shards.
>>
>> The sharding logic of the Beam IO is handy but it shouldn't be strictly
>> necessary when the data is already partitioned nicely.
>>
>> It seems the sharding logic is primarily necessary because there is no
>> notion of a worker's ID in Beam. In Flink, you can retrieve the worker
>> ID at runtime and every worker just directly writes its results to a
>> file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.
>>
>> Robert, don't we already have Reshuffle which can be overriden? However,
>> it is not used by the WritesFiles code.
>>
>>
>> -Max
>>
>> On 26.10.18 11:41, Robert Bradshaw wrote:
>>> I think it's worth adding a URN for the operation of distributing
>>> "evenly" into an "appropriate" number of shards. A naive implementation
>>> would add random keys and to a ReshufflePerKey, but runners could
>>> override this to do a reshuffle and then key by whatever notion of
>>> bundle/worker/shard identifier they have that lines up with the number
>>> of actual workers.
>>>
>>> On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com
>>> <ma...@gmail.com>> wrote:
>>>
>>>      Thanks for the JIRA. If I understand it correctly ... so runner
>>>      determined sharding will avoid extra shuffle? Will it just write
>>>      worker local available data to it's shard? Something similar to
>>>      coalesce in Spark?
>>>
>>>      On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
>>>      <ma...@apache.org>> wrote:
>>>
>>>          Oh ok, thanks for the pointer. Coming from Flink, the default is
>>>          that
>>>          the sharding is determined by the runtime distribution. Indeed,
>>>          we will
>>>          have to add an overwrite to the Flink Runner, similar to this one:
>>>
>>>          https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>>>
>>>          Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>>>
>>>          Thanks,
>>>          Max
>>>
>>>          On 25.10.18 22:37, Reuven Lax wrote:
>>>           > FYI the Dataflow runner automatically sets the default number
>>>          of shards
>>>           > (I believe to be 2 * num_workers). Probably we should do
>>>          something
>>>           > similar for the Flink runner.
>>>           >
>>>           > This needs to be done by the runner, as # of workers is a runner
>>>           > concept; the SDK itself has no concept of workers.
>>>           >
>>>           > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
>>>          <jozo.vilcek@gmail.com <ma...@gmail.com>
>>>           > <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>> wrote:
>>>           >
>>>           >     If I do not specify shards for unbounded collection, I get
>>>           >
>>>           >     Caused by: java.lang.IllegalArgumentException: When applying
>>>           >     WriteFiles to an unbounded PCollection, must specify
>>>          number of
>>>           >     output shards explicitly
>>>           >              at
>>>           >     org.apache.beam.repackaged.beam_sdks_java_core.com
>>>          <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>>>           >              at
>>>           > org.apache.beam.sdk.io
>>>          <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
>>>           >
>>>           >     Around same lines in WriteFiles is also a check for
>>>          windowed writes.
>>>           >     I believe FileIO enables it explicitly when windowing is
>>>          present. In
>>>           >     filesystem written files are per window and shard.
>>>           >
>>>           >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
>>>          <mxm@apache.org <ma...@apache.org>
>>>           >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>>>           >
>>>           >         I agree it would be nice to keep the current
>>>          distribution of
>>>           >         elements
>>>           >         instead of doing a shuffle based on an artificial
>>>          shard key.
>>>           >
>>>           >         Have you tried `withWindowedWrites()`? Also, why do
>>>          you say you
>>>           >         need to
>>>           >         specify the number of shards in streaming mode?
>>>           >
>>>           >         -Max
>>>           >
>>>           >         On 25.10.18 10:12, Jozef Vilcek wrote:
>>>           >          > Hm, yes, this makes sense now, but what can be
>>>          done for my
>>>           >         case? I do
>>>           >          > not want to end up with too many files on disk.
>>>           >          >
>>>           >          > I think what I am looking for is to instruct IO
>>>          that do not
>>>           >         do again
>>>           >          > random shard and reshuffle but just assume number
>>>          of shards
>>>           >         equal to
>>>           >          > number of workers and shard ID is a worker ID.
>>>           >          > Is this doable in beam model?
>>>           >          >
>>>           >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>>>           >         <mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>
>>>           >          > <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>>>           >          >
>>>           >          >     The FlinkRunner uses a hash function
>>>          (MurmurHash) on each
>>>           >         key which
>>>           >          >     places keys somewhere in the hash space. The
>>>          hash space
>>>           >         (2^32) is split
>>>           >          >     among the partitions (5 in your case). Given
>>>          enough keys,
>>>           >         the chance
>>>           >          >     increases they are equally spread.
>>>           >          >
>>>           >          >     This should be similar to what the other
>>>          Runners do.
>>>           >          >
>>>           >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>>>           >          >      >
>>>           >          >      > So if I run 5 workers with 50 shards, I end
>>>          up with:
>>>           >          >      >
>>>           >          >      > DurationBytes receivedRecords received
>>>           >          >      >   2m 39s        900 MB            465,525
>>>           >          >      >   2m 39s       1.76 GB            930,720
>>>           >          >      >   2m 39s        789 MB            407,315
>>>           >          >      >   2m 39s       1.32 GB            698,262
>>>           >          >      >   2m 39s        788 MB            407,310
>>>           >          >      >
>>>           >          >      > Still not good but better than with 5
>>>          shards where
>>>           >         some workers
>>>           >          >     did not
>>>           >          >      > participate at all.
>>>           >          >      > So, problem is in some layer which
>>>          distributes keys /
>>>           >         shards
>>>           >          >     among workers?
>>>           >          >      >
>>>           >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>>>           >         <relax@google.com <ma...@google.com>
>>>          <mailto:relax@google.com <ma...@google.com>>
>>>           >          >     <mailto:relax@google.com
>>>          <ma...@google.com> <mailto:relax@google.com
>>>          <ma...@google.com>>>
>>>           >          >      > <mailto:relax@google.com
>>>          <ma...@google.com> <mailto:relax@google.com
>>>          <ma...@google.com>>
>>>           >         <mailto:relax@google.com <ma...@google.com>
>>>          <mailto:relax@google.com <ma...@google.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >     withNumShards(5) generates 5 random
>>>          shards. It
>>>           >         turns out that
>>>           >          >      >     statistically when you generate 5
>>>          random shards
>>>           >         and you have 5
>>>           >          >      >     works, the probability is reasonably
>>>          high that
>>>           >         some workers
>>>           >          >     will get
>>>           >          >      >     more than one shard (and as a result
>>>          not all
>>>           >         workers will
>>>           >          >      >     participate). Are you able to set the
>>>          number of
>>>           >         shards larger
>>>           >          >     than 5?
>>>           >          >      >
>>>           >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
>>>          Vilcek
>>>           >          >     <jozo.vilcek@gmail.com
>>>          <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>
>>>           >          >      >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>
>>>           >          >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >         cc (dev)
>>>           >          >      >
>>>           >          >      >         I tried to run the example with
>>>          FlinkRunner in
>>>           >         batch mode and
>>>           >          >      >         received again bad data spread
>>>          among the workers.
>>>           >          >      >
>>>           >          >      >         When I tried to remove number of
>>>          shards for
>>>           >         batch mode in
>>>           >          >     above
>>>           >          >      >         example, pipeline crashed before launch
>>>           >          >      >
>>>           >          >      >         Caused by:
>>>          java.lang.IllegalStateException:
>>>           >         Inputs to Flatten
>>>           >          >      >         had incompatible triggers:
>>>           >          >      >
>>>           >          >
>>>           >
>>>            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>>>           >          >      >         entCountAtLeast(10000)),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>>>           >          >      >         hour)))),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>>>           >          >      >
>>>            rever(AfterPane.elementCountAtLeast(1)),
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >
>>>           >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
>>>          Jozef Vilcek
>>>           >          >      >         <jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>
>>>           >          >     <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>
>>>           >         <mailto:jozo.vilcek@gmail.com
>>>          <ma...@gmail.com>>>>> wrote:
>>>           >          >      >
>>>           >          >      >             Hi Max,
>>>           >          >      >
>>>           >          >      >             I forgot to mention that
>>>          example is run in
>>>           >         streaming
>>>           >          >     mode,
>>>           >          >      >             therefore I can not do writes
>>>          without
>>>           >         specifying shards.
>>>           >          >      >             FileIO explicitly asks for them.
>>>           >          >      >
>>>           >          >      >             I am not sure where the problem is.
>>>           >         FlinkRunner is
>>>           >          >     only one
>>>           >          >      >             I used.
>>>           >          >      >
>>>           >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>>>           >         Maximilian Michels
>>>           >          >      >             <mxm@apache.org
>>>          <ma...@apache.org> <mailto:mxm@apache.org
>>>          <ma...@apache.org>>
>>>           >         <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>
>>>           >          >     <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>
>>>           >         <mailto:mxm@apache.org <ma...@apache.org>
>>>          <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
>>>           >          >      >
>>>           >          >      >                 Hi Jozef,
>>>           >          >      >
>>>           >          >      >                 This does not look like a
>>>          FlinkRunner
>>>           >         related
>>>           >          >     problem,
>>>           >          >      >                 but is caused by
>>>           >          >      >                 the `WriteFiles` sharding
>>>          logic. It
>>>           >         assigns keys and
>>>           >          >      >                 does a Reshuffle
>>>           >          >      >                 which apparently does not
>>>          lead to good
>>>           >         data spread in
>>>           >          >      >                 your case.
>>>           >          >      >
>>>           >          >      >                 Do you see the same
>>>          behavior without
>>>           >          >     `withNumShards(5)`?
>>>           >          >      >
>>>           >          >      >                 Thanks,
>>>           >          >      >                 Max
>>>           >          >      >
>>>           >          >      >                 On 22.10.18 11:57, Jozef
>>>          Vilcek wrote:
>>>           >          >      >                  > Hello,
>>>           >          >      >                  >
>>>           >          >      >                  > I am having some trouble
>>>          to get a
>>>           >         balanced
>>>           >          >     write via
>>>           >          >      >                 FileIO. Workers at
>>>           >          >      >                  > the shuffle side where
>>>          data per
>>>           >         window fire are
>>>           >          >      >                 written to the
>>>           >          >      >                  > filesystem receive
>>>          unbalanced
>>>           >         number of events.
>>>           >          >      >                  >
>>>           >          >      >                  > Here is a naive code
>>>          example:
>>>           >          >      >                  >
>>>           >          >      >                  >      val read =
>>>          KafkaIO.read()
>>>           >          >      >                  >          .withTopic("topic")
>>>           >          >      >                  >
>>>           >         .withBootstrapServers("kafka1:9092")
>>>           >          >      >                  >
>>>           >          >      >
>>>           >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>           >          >      >                  >
>>>          .withProcessingTime()
>>>           >          >      >                  >
>>>           >          >      >                  >      pipeline
>>>           >          >      >                  >          .apply(read)
>>>           >          >      >                  >
>>>          .apply(MapElements.via(new
>>>           >          >      >                  >
>>>          SimpleFunction[KafkaRecord[Array[Byte],
>>>           >          >     Array[Byte]],
>>>           >          >      >                 String]() {
>>>           >          >      >                  >            override def
>>>          apply(input:
>>>           >          >      >                 KafkaRecord[Array[Byte],
>>>           >          >      >                  > Array[Byte]]): String = {
>>>           >          >      >                  >              new
>>>           >         String(input.getKV.getValue,
>>>           >          >     "UTF-8")
>>>           >          >      >                  >            }
>>>           >          >      >                  >          }))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>            .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>           >          >      >                  >
>>>           >          >     .triggering(AfterWatermark.pastEndOfWindow()
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>            Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>            Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>>           >          >      >                  >
>>>          .discardingFiredPanes()
>>>           >          >      >                  >
>>>           >          >      >
>>>           >           .withAllowedLateness(Duration.standardDays(7)))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>          .apply(FileIO.write()
>>>           >          >      >                  >
>>>          .via(TextIO.sink())
>>>           >          >      >                  >              .withNaming(new
>>>           >          >      >                 SafeFileNaming(outputPath,
>>>          ".txt"))
>>>           >          >      >                  >
>>>           >         .withTempDirectory(tempLocation)
>>>           >          >      >                  >
>>>          .withNumShards(5))
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >                  > If I run this on Beam
>>>          2.6.0 with
>>>           >         Flink 1.5.0 on 5
>>>           >          >      >                 workers (equal to
>>>           >          >      >                  > number of shards), I
>>>          would expect
>>>           >         that each worker
>>>           >          >      >                 will participate on
>>>           >          >      >                  > persisting shards and
>>>          equally,
>>>           >         since code uses
>>>           >          >     fixed
>>>           >          >      >                 number of shards
>>>           >          >      >                  > (and random shard
>>>          assign?). But
>>>           >         reality is
>>>           >          >     different
>>>           >          >      >                 (see 2 attachements
>>>           >          >      >                  > - statistiscs from flink
>>>          task
>>>           >         reading from
>>>           >          >     kafka and
>>>           >          >      >                 task writing to files)
>>>           >          >      >                  >
>>>           >          >      >                  > What am I missing? How
>>>          to achieve
>>>           >         balanced writes?
>>>           >          >      >                  >
>>>           >          >      >                  > Thanks,
>>>           >          >      >                  > Jozef
>>>           >          >      >                  >
>>>           >          >      >                  >
>>>           >          >      >
>>>           >          >
>>>           >
>>>

Re: Unbalanced FileIO writes on Flink

Posted by Robert Bradshaw <ro...@google.com>.
We can't use Reshuffle for this, as there may be other reasons the
user wants to actually force a reshuffle, but I was suggesting a
transform like reshuffle that can avoid the actual reshuffle if the
data is already well distributed, and also provides some kind of
unique key (though perhaps just choosing a random nonce in
start_bundle would be sufficient).

For sinks where we may need to retry writes, Reshuffle has been
(ab)used to provide stable inputs, but for file-based sinks, this does
not seem necessary. I don't recall why we made the choice of shard
counts required in streaming mode. Perhaps because the bundles were to
small (per key?) by default and we wanted to force more grouping?

On Fri, Oct 26, 2018 at 3:32 PM Maximilian Michels <mx...@apache.org> wrote:
>
> Actually, I don't think setting the number of shards by the Runner will
> solve the problem. The shuffling logic still remains. And, as observed
> by Jozef, it doesn't necessarily lead to balanced shards.
>
> The sharding logic of the Beam IO is handy but it shouldn't be strictly
> necessary when the data is already partitioned nicely.
>
> It seems the sharding logic is primarily necessary because there is no
> notion of a worker's ID in Beam. In Flink, you can retrieve the worker
> ID at runtime and every worker just directly writes its results to a
> file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.
>
> Robert, don't we already have Reshuffle which can be overriden? However,
> it is not used by the WritesFiles code.
>
>
> -Max
>
> On 26.10.18 11:41, Robert Bradshaw wrote:
> > I think it's worth adding a URN for the operation of distributing
> > "evenly" into an "appropriate" number of shards. A naive implementation
> > would add random keys and to a ReshufflePerKey, but runners could
> > override this to do a reshuffle and then key by whatever notion of
> > bundle/worker/shard identifier they have that lines up with the number
> > of actual workers.
> >
> > On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Thanks for the JIRA. If I understand it correctly ... so runner
> >     determined sharding will avoid extra shuffle? Will it just write
> >     worker local available data to it's shard? Something similar to
> >     coalesce in Spark?
> >
> >     On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         Oh ok, thanks for the pointer. Coming from Flink, the default is
> >         that
> >         the sharding is determined by the runtime distribution. Indeed,
> >         we will
> >         have to add an overwrite to the Flink Runner, similar to this one:
> >
> >         https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
> >
> >         Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
> >
> >         Thanks,
> >         Max
> >
> >         On 25.10.18 22:37, Reuven Lax wrote:
> >          > FYI the Dataflow runner automatically sets the default number
> >         of shards
> >          > (I believe to be 2 * num_workers). Probably we should do
> >         something
> >          > similar for the Flink runner.
> >          >
> >          > This needs to be done by the runner, as # of workers is a runner
> >          > concept; the SDK itself has no concept of workers.
> >          >
> >          > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
> >         <jozo.vilcek@gmail.com <ma...@gmail.com>
> >          > <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>> wrote:
> >          >
> >          >     If I do not specify shards for unbounded collection, I get
> >          >
> >          >     Caused by: java.lang.IllegalArgumentException: When applying
> >          >     WriteFiles to an unbounded PCollection, must specify
> >         number of
> >          >     output shards explicitly
> >          >              at
> >          >     org.apache.beam.repackaged.beam_sdks_java_core.com
> >         <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> >          >              at
> >          > org.apache.beam.sdk.io
> >         <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
> >          >
> >          >     Around same lines in WriteFiles is also a check for
> >         windowed writes.
> >          >     I believe FileIO enables it explicitly when windowing is
> >         present. In
> >          >     filesystem written files are per window and shard.
> >          >
> >          >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >         I agree it would be nice to keep the current
> >         distribution of
> >          >         elements
> >          >         instead of doing a shuffle based on an artificial
> >         shard key.
> >          >
> >          >         Have you tried `withWindowedWrites()`? Also, why do
> >         you say you
> >          >         need to
> >          >         specify the number of shards in streaming mode?
> >          >
> >          >         -Max
> >          >
> >          >         On 25.10.18 10:12, Jozef Vilcek wrote:
> >          >          > Hm, yes, this makes sense now, but what can be
> >         done for my
> >          >         case? I do
> >          >          > not want to end up with too many files on disk.
> >          >          >
> >          >          > I think what I am looking for is to instruct IO
> >         that do not
> >          >         do again
> >          >          > random shard and reshuffle but just assume number
> >         of shards
> >          >         equal to
> >          >          > number of workers and shard ID is a worker ID.
> >          >          > Is this doable in beam model?
> >          >          >
> >          >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> >          >         <mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >          > <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >          >          >
> >          >          >     The FlinkRunner uses a hash function
> >         (MurmurHash) on each
> >          >         key which
> >          >          >     places keys somewhere in the hash space. The
> >         hash space
> >          >         (2^32) is split
> >          >          >     among the partitions (5 in your case). Given
> >         enough keys,
> >          >         the chance
> >          >          >     increases they are equally spread.
> >          >          >
> >          >          >     This should be similar to what the other
> >         Runners do.
> >          >          >
> >          >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >          >          >      >
> >          >          >      > So if I run 5 workers with 50 shards, I end
> >         up with:
> >          >          >      >
> >          >          >      > DurationBytes receivedRecords received
> >          >          >      >   2m 39s        900 MB            465,525
> >          >          >      >   2m 39s       1.76 GB            930,720
> >          >          >      >   2m 39s        789 MB            407,315
> >          >          >      >   2m 39s       1.32 GB            698,262
> >          >          >      >   2m 39s        788 MB            407,310
> >          >          >      >
> >          >          >      > Still not good but better than with 5
> >         shards where
> >          >         some workers
> >          >          >     did not
> >          >          >      > participate at all.
> >          >          >      > So, problem is in some layer which
> >         distributes keys /
> >          >         shards
> >          >          >     among workers?
> >          >          >      >
> >          >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
> >          >         <relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>
> >          >          >     <mailto:relax@google.com
> >         <ma...@google.com> <mailto:relax@google.com
> >         <ma...@google.com>>>
> >          >          >      > <mailto:relax@google.com
> >         <ma...@google.com> <mailto:relax@google.com
> >         <ma...@google.com>>
> >          >         <mailto:relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>>>> wrote:
> >          >          >      >
> >          >          >      >     withNumShards(5) generates 5 random
> >         shards. It
> >          >         turns out that
> >          >          >      >     statistically when you generate 5
> >         random shards
> >          >         and you have 5
> >          >          >      >     works, the probability is reasonably
> >         high that
> >          >         some workers
> >          >          >     will get
> >          >          >      >     more than one shard (and as a result
> >         not all
> >          >         workers will
> >          >          >      >     participate). Are you able to set the
> >         number of
> >          >         shards larger
> >          >          >     than 5?
> >          >          >      >
> >          >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
> >         Vilcek
> >          >          >     <jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>
> >          >          >      >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>>> wrote:
> >          >          >      >
> >          >          >      >         cc (dev)
> >          >          >      >
> >          >          >      >         I tried to run the example with
> >         FlinkRunner in
> >          >         batch mode and
> >          >          >      >         received again bad data spread
> >         among the workers.
> >          >          >      >
> >          >          >      >         When I tried to remove number of
> >         shards for
> >          >         batch mode in
> >          >          >     above
> >          >          >      >         example, pipeline crashed before launch
> >          >          >      >
> >          >          >      >         Caused by:
> >         java.lang.IllegalStateException:
> >          >         Inputs to Flatten
> >          >          >      >         had incompatible triggers:
> >          >          >      >
> >          >          >
> >          >
> >           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >          >          >      >         entCountAtLeast(10000)),
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >          >          >      >         hour)))),
> >          >          >      >
> >          >          >
> >          >
> >           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >          >          >      >
> >           rever(AfterPane.elementCountAtLeast(1)),
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
> >         Jozef Vilcek
> >          >          >      >         <jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>
> >          >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>>> wrote:
> >          >          >      >
> >          >          >      >             Hi Max,
> >          >          >      >
> >          >          >      >             I forgot to mention that
> >         example is run in
> >          >         streaming
> >          >          >     mode,
> >          >          >      >             therefore I can not do writes
> >         without
> >          >         specifying shards.
> >          >          >      >             FileIO explicitly asks for them.
> >          >          >      >
> >          >          >      >             I am not sure where the problem is.
> >          >         FlinkRunner is
> >          >          >     only one
> >          >          >      >             I used.
> >          >          >      >
> >          >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
> >          >         Maximilian Michels
> >          >          >      >             <mxm@apache.org
> >         <ma...@apache.org> <mailto:mxm@apache.org
> >         <ma...@apache.org>>
> >          >         <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>
> >          >          >     <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >         <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
> >          >          >      >
> >          >          >      >                 Hi Jozef,
> >          >          >      >
> >          >          >      >                 This does not look like a
> >         FlinkRunner
> >          >         related
> >          >          >     problem,
> >          >          >      >                 but is caused by
> >          >          >      >                 the `WriteFiles` sharding
> >         logic. It
> >          >         assigns keys and
> >          >          >      >                 does a Reshuffle
> >          >          >      >                 which apparently does not
> >         lead to good
> >          >         data spread in
> >          >          >      >                 your case.
> >          >          >      >
> >          >          >      >                 Do you see the same
> >         behavior without
> >          >          >     `withNumShards(5)`?
> >          >          >      >
> >          >          >      >                 Thanks,
> >          >          >      >                 Max
> >          >          >      >
> >          >          >      >                 On 22.10.18 11:57, Jozef
> >         Vilcek wrote:
> >          >          >      >                  > Hello,
> >          >          >      >                  >
> >          >          >      >                  > I am having some trouble
> >         to get a
> >          >         balanced
> >          >          >     write via
> >          >          >      >                 FileIO. Workers at
> >          >          >      >                  > the shuffle side where
> >         data per
> >          >         window fire are
> >          >          >      >                 written to the
> >          >          >      >                  > filesystem receive
> >         unbalanced
> >          >         number of events.
> >          >          >      >                  >
> >          >          >      >                  > Here is a naive code
> >         example:
> >          >          >      >                  >
> >          >          >      >                  >      val read =
> >         KafkaIO.read()
> >          >          >      >                  >          .withTopic("topic")
> >          >          >      >                  >
> >          >         .withBootstrapServers("kafka1:9092")
> >          >          >      >                  >
> >          >          >      >
> >          >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withValueDeserializer(classOf[ByteArrayDeserializer])
> >          >          >      >                  >
> >         .withProcessingTime()
> >          >          >      >                  >
> >          >          >      >                  >      pipeline
> >          >          >      >                  >          .apply(read)
> >          >          >      >                  >
> >         .apply(MapElements.via(new
> >          >          >      >                  >
> >         SimpleFunction[KafkaRecord[Array[Byte],
> >          >          >     Array[Byte]],
> >          >          >      >                 String]() {
> >          >          >      >                  >            override def
> >         apply(input:
> >          >          >      >                 KafkaRecord[Array[Byte],
> >          >          >      >                  > Array[Byte]]): String = {
> >          >          >      >                  >              new
> >          >         String(input.getKV.getValue,
> >          >          >     "UTF-8")
> >          >          >      >                  >            }
> >          >          >      >                  >          }))
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >          >          >      >                  >
> >          >          >     .triggering(AfterWatermark.pastEndOfWindow()
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >          >          >      >                  >
> >         .discardingFiredPanes()
> >          >          >      >                  >
> >          >          >      >
> >          >           .withAllowedLateness(Duration.standardDays(7)))
> >          >          >      >                  >
> >          >          >      >                  >
> >         .apply(FileIO.write()
> >          >          >      >                  >
> >         .via(TextIO.sink())
> >          >          >      >                  >              .withNaming(new
> >          >          >      >                 SafeFileNaming(outputPath,
> >         ".txt"))
> >          >          >      >                  >
> >          >         .withTempDirectory(tempLocation)
> >          >          >      >                  >
> >         .withNumShards(5))
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >                  > If I run this on Beam
> >         2.6.0 with
> >          >         Flink 1.5.0 on 5
> >          >          >      >                 workers (equal to
> >          >          >      >                  > number of shards), I
> >         would expect
> >          >         that each worker
> >          >          >      >                 will participate on
> >          >          >      >                  > persisting shards and
> >         equally,
> >          >         since code uses
> >          >          >     fixed
> >          >          >      >                 number of shards
> >          >          >      >                  > (and random shard
> >         assign?). But
> >          >         reality is
> >          >          >     different
> >          >          >      >                 (see 2 attachements
> >          >          >      >                  > - statistiscs from flink
> >         task
> >          >         reading from
> >          >          >     kafka and
> >          >          >      >                 task writing to files)
> >          >          >      >                  >
> >          >          >      >                  > What am I missing? How
> >         to achieve
> >          >         balanced writes?
> >          >          >      >                  >
> >          >          >      >                  > Thanks,
> >          >          >      >                  > Jozef
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >

Re: Unbalanced FileIO writes on Flink

Posted by Robert Bradshaw <ro...@google.com>.
We can't use Reshuffle for this, as there may be other reasons the
user wants to actually force a reshuffle, but I was suggesting a
transform like reshuffle that can avoid the actual reshuffle if the
data is already well distributed, and also provides some kind of
unique key (though perhaps just choosing a random nonce in
start_bundle would be sufficient).

For sinks where we may need to retry writes, Reshuffle has been
(ab)used to provide stable inputs, but for file-based sinks, this does
not seem necessary. I don't recall why we made the choice of shard
counts required in streaming mode. Perhaps because the bundles were to
small (per key?) by default and we wanted to force more grouping?

On Fri, Oct 26, 2018 at 3:32 PM Maximilian Michels <mx...@apache.org> wrote:
>
> Actually, I don't think setting the number of shards by the Runner will
> solve the problem. The shuffling logic still remains. And, as observed
> by Jozef, it doesn't necessarily lead to balanced shards.
>
> The sharding logic of the Beam IO is handy but it shouldn't be strictly
> necessary when the data is already partitioned nicely.
>
> It seems the sharding logic is primarily necessary because there is no
> notion of a worker's ID in Beam. In Flink, you can retrieve the worker
> ID at runtime and every worker just directly writes its results to a
> file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.
>
> Robert, don't we already have Reshuffle which can be overriden? However,
> it is not used by the WritesFiles code.
>
>
> -Max
>
> On 26.10.18 11:41, Robert Bradshaw wrote:
> > I think it's worth adding a URN for the operation of distributing
> > "evenly" into an "appropriate" number of shards. A naive implementation
> > would add random keys and to a ReshufflePerKey, but runners could
> > override this to do a reshuffle and then key by whatever notion of
> > bundle/worker/shard identifier they have that lines up with the number
> > of actual workers.
> >
> > On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Thanks for the JIRA. If I understand it correctly ... so runner
> >     determined sharding will avoid extra shuffle? Will it just write
> >     worker local available data to it's shard? Something similar to
> >     coalesce in Spark?
> >
> >     On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         Oh ok, thanks for the pointer. Coming from Flink, the default is
> >         that
> >         the sharding is determined by the runtime distribution. Indeed,
> >         we will
> >         have to add an overwrite to the Flink Runner, similar to this one:
> >
> >         https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
> >
> >         Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
> >
> >         Thanks,
> >         Max
> >
> >         On 25.10.18 22:37, Reuven Lax wrote:
> >          > FYI the Dataflow runner automatically sets the default number
> >         of shards
> >          > (I believe to be 2 * num_workers). Probably we should do
> >         something
> >          > similar for the Flink runner.
> >          >
> >          > This needs to be done by the runner, as # of workers is a runner
> >          > concept; the SDK itself has no concept of workers.
> >          >
> >          > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
> >         <jozo.vilcek@gmail.com <ma...@gmail.com>
> >          > <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>> wrote:
> >          >
> >          >     If I do not specify shards for unbounded collection, I get
> >          >
> >          >     Caused by: java.lang.IllegalArgumentException: When applying
> >          >     WriteFiles to an unbounded PCollection, must specify
> >         number of
> >          >     output shards explicitly
> >          >              at
> >          >     org.apache.beam.repackaged.beam_sdks_java_core.com
> >         <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> >          >              at
> >          > org.apache.beam.sdk.io
> >         <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
> >          >
> >          >     Around same lines in WriteFiles is also a check for
> >         windowed writes.
> >          >     I believe FileIO enables it explicitly when windowing is
> >         present. In
> >          >     filesystem written files are per window and shard.
> >          >
> >          >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >         I agree it would be nice to keep the current
> >         distribution of
> >          >         elements
> >          >         instead of doing a shuffle based on an artificial
> >         shard key.
> >          >
> >          >         Have you tried `withWindowedWrites()`? Also, why do
> >         you say you
> >          >         need to
> >          >         specify the number of shards in streaming mode?
> >          >
> >          >         -Max
> >          >
> >          >         On 25.10.18 10:12, Jozef Vilcek wrote:
> >          >          > Hm, yes, this makes sense now, but what can be
> >         done for my
> >          >         case? I do
> >          >          > not want to end up with too many files on disk.
> >          >          >
> >          >          > I think what I am looking for is to instruct IO
> >         that do not
> >          >         do again
> >          >          > random shard and reshuffle but just assume number
> >         of shards
> >          >         equal to
> >          >          > number of workers and shard ID is a worker ID.
> >          >          > Is this doable in beam model?
> >          >          >
> >          >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> >          >         <mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >          > <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >          >          >
> >          >          >     The FlinkRunner uses a hash function
> >         (MurmurHash) on each
> >          >         key which
> >          >          >     places keys somewhere in the hash space. The
> >         hash space
> >          >         (2^32) is split
> >          >          >     among the partitions (5 in your case). Given
> >         enough keys,
> >          >         the chance
> >          >          >     increases they are equally spread.
> >          >          >
> >          >          >     This should be similar to what the other
> >         Runners do.
> >          >          >
> >          >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >          >          >      >
> >          >          >      > So if I run 5 workers with 50 shards, I end
> >         up with:
> >          >          >      >
> >          >          >      > DurationBytes receivedRecords received
> >          >          >      >   2m 39s        900 MB            465,525
> >          >          >      >   2m 39s       1.76 GB            930,720
> >          >          >      >   2m 39s        789 MB            407,315
> >          >          >      >   2m 39s       1.32 GB            698,262
> >          >          >      >   2m 39s        788 MB            407,310
> >          >          >      >
> >          >          >      > Still not good but better than with 5
> >         shards where
> >          >         some workers
> >          >          >     did not
> >          >          >      > participate at all.
> >          >          >      > So, problem is in some layer which
> >         distributes keys /
> >          >         shards
> >          >          >     among workers?
> >          >          >      >
> >          >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
> >          >         <relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>
> >          >          >     <mailto:relax@google.com
> >         <ma...@google.com> <mailto:relax@google.com
> >         <ma...@google.com>>>
> >          >          >      > <mailto:relax@google.com
> >         <ma...@google.com> <mailto:relax@google.com
> >         <ma...@google.com>>
> >          >         <mailto:relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>>>> wrote:
> >          >          >      >
> >          >          >      >     withNumShards(5) generates 5 random
> >         shards. It
> >          >         turns out that
> >          >          >      >     statistically when you generate 5
> >         random shards
> >          >         and you have 5
> >          >          >      >     works, the probability is reasonably
> >         high that
> >          >         some workers
> >          >          >     will get
> >          >          >      >     more than one shard (and as a result
> >         not all
> >          >         workers will
> >          >          >      >     participate). Are you able to set the
> >         number of
> >          >         shards larger
> >          >          >     than 5?
> >          >          >      >
> >          >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
> >         Vilcek
> >          >          >     <jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>
> >          >          >      >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>>> wrote:
> >          >          >      >
> >          >          >      >         cc (dev)
> >          >          >      >
> >          >          >      >         I tried to run the example with
> >         FlinkRunner in
> >          >         batch mode and
> >          >          >      >         received again bad data spread
> >         among the workers.
> >          >          >      >
> >          >          >      >         When I tried to remove number of
> >         shards for
> >          >         batch mode in
> >          >          >     above
> >          >          >      >         example, pipeline crashed before launch
> >          >          >      >
> >          >          >      >         Caused by:
> >         java.lang.IllegalStateException:
> >          >         Inputs to Flatten
> >          >          >      >         had incompatible triggers:
> >          >          >      >
> >          >          >
> >          >
> >           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >          >          >      >         entCountAtLeast(10000)),
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >          >          >      >         hour)))),
> >          >          >      >
> >          >          >
> >          >
> >           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >          >          >      >
> >           rever(AfterPane.elementCountAtLeast(1)),
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >
> >          >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
> >         Jozef Vilcek
> >          >          >      >         <jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>
> >          >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >         <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>>> wrote:
> >          >          >      >
> >          >          >      >             Hi Max,
> >          >          >      >
> >          >          >      >             I forgot to mention that
> >         example is run in
> >          >         streaming
> >          >          >     mode,
> >          >          >      >             therefore I can not do writes
> >         without
> >          >         specifying shards.
> >          >          >      >             FileIO explicitly asks for them.
> >          >          >      >
> >          >          >      >             I am not sure where the problem is.
> >          >         FlinkRunner is
> >          >          >     only one
> >          >          >      >             I used.
> >          >          >      >
> >          >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
> >          >         Maximilian Michels
> >          >          >      >             <mxm@apache.org
> >         <ma...@apache.org> <mailto:mxm@apache.org
> >         <ma...@apache.org>>
> >          >         <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>
> >          >          >     <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >         <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
> >          >          >      >
> >          >          >      >                 Hi Jozef,
> >          >          >      >
> >          >          >      >                 This does not look like a
> >         FlinkRunner
> >          >         related
> >          >          >     problem,
> >          >          >      >                 but is caused by
> >          >          >      >                 the `WriteFiles` sharding
> >         logic. It
> >          >         assigns keys and
> >          >          >      >                 does a Reshuffle
> >          >          >      >                 which apparently does not
> >         lead to good
> >          >         data spread in
> >          >          >      >                 your case.
> >          >          >      >
> >          >          >      >                 Do you see the same
> >         behavior without
> >          >          >     `withNumShards(5)`?
> >          >          >      >
> >          >          >      >                 Thanks,
> >          >          >      >                 Max
> >          >          >      >
> >          >          >      >                 On 22.10.18 11:57, Jozef
> >         Vilcek wrote:
> >          >          >      >                  > Hello,
> >          >          >      >                  >
> >          >          >      >                  > I am having some trouble
> >         to get a
> >          >         balanced
> >          >          >     write via
> >          >          >      >                 FileIO. Workers at
> >          >          >      >                  > the shuffle side where
> >         data per
> >          >         window fire are
> >          >          >      >                 written to the
> >          >          >      >                  > filesystem receive
> >         unbalanced
> >          >         number of events.
> >          >          >      >                  >
> >          >          >      >                  > Here is a naive code
> >         example:
> >          >          >      >                  >
> >          >          >      >                  >      val read =
> >         KafkaIO.read()
> >          >          >      >                  >          .withTopic("topic")
> >          >          >      >                  >
> >          >         .withBootstrapServers("kafka1:9092")
> >          >          >      >                  >
> >          >          >      >
> >          >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withValueDeserializer(classOf[ByteArrayDeserializer])
> >          >          >      >                  >
> >         .withProcessingTime()
> >          >          >      >                  >
> >          >          >      >                  >      pipeline
> >          >          >      >                  >          .apply(read)
> >          >          >      >                  >
> >         .apply(MapElements.via(new
> >          >          >      >                  >
> >         SimpleFunction[KafkaRecord[Array[Byte],
> >          >          >     Array[Byte]],
> >          >          >      >                 String]() {
> >          >          >      >                  >            override def
> >         apply(input:
> >          >          >      >                 KafkaRecord[Array[Byte],
> >          >          >      >                  > Array[Byte]]): String = {
> >          >          >      >                  >              new
> >          >         String(input.getKV.getValue,
> >          >          >     "UTF-8")
> >          >          >      >                  >            }
> >          >          >      >                  >          }))
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >          >          >      >                  >
> >          >          >     .triggering(AfterWatermark.pastEndOfWindow()
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >           Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >          >          >      >                  >
> >         .discardingFiredPanes()
> >          >          >      >                  >
> >          >          >      >
> >          >           .withAllowedLateness(Duration.standardDays(7)))
> >          >          >      >                  >
> >          >          >      >                  >
> >         .apply(FileIO.write()
> >          >          >      >                  >
> >         .via(TextIO.sink())
> >          >          >      >                  >              .withNaming(new
> >          >          >      >                 SafeFileNaming(outputPath,
> >         ".txt"))
> >          >          >      >                  >
> >          >         .withTempDirectory(tempLocation)
> >          >          >      >                  >
> >         .withNumShards(5))
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >                  > If I run this on Beam
> >         2.6.0 with
> >          >         Flink 1.5.0 on 5
> >          >          >      >                 workers (equal to
> >          >          >      >                  > number of shards), I
> >         would expect
> >          >         that each worker
> >          >          >      >                 will participate on
> >          >          >      >                  > persisting shards and
> >         equally,
> >          >         since code uses
> >          >          >     fixed
> >          >          >      >                 number of shards
> >          >          >      >                  > (and random shard
> >         assign?). But
> >          >         reality is
> >          >          >     different
> >          >          >      >                 (see 2 attachements
> >          >          >      >                  > - statistiscs from flink
> >         task
> >          >         reading from
> >          >          >     kafka and
> >          >          >      >                 task writing to files)
> >          >          >      >                  >
> >          >          >      >                  > What am I missing? How
> >         to achieve
> >          >         balanced writes?
> >          >          >      >                  >
> >          >          >      >                  > Thanks,
> >          >          >      >                  > Jozef
> >          >          >      >                  >
> >          >          >      >                  >
> >          >          >      >
> >          >          >
> >          >
> >

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Actually, I don't think setting the number of shards by the Runner will 
solve the problem. The shuffling logic still remains. And, as observed 
by Jozef, it doesn't necessarily lead to balanced shards.

The sharding logic of the Beam IO is handy but it shouldn't be strictly 
necessary when the data is already partitioned nicely.

It seems the sharding logic is primarily necessary because there is no 
notion of a worker's ID in Beam. In Flink, you can retrieve the worker 
ID at runtime and every worker just directly writes its results to a 
file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.

Robert, don't we already have Reshuffle which can be overriden? However, 
it is not used by the WritesFiles code.


-Max

On 26.10.18 11:41, Robert Bradshaw wrote:
> I think it's worth adding a URN for the operation of distributing 
> "evenly" into an "appropriate" number of shards. A naive implementation 
> would add random keys and to a ReshufflePerKey, but runners could 
> override this to do a reshuffle and then key by whatever notion of 
> bundle/worker/shard identifier they have that lines up with the number 
> of actual workers.
> 
> On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Thanks for the JIRA. If I understand it correctly ... so runner
>     determined sharding will avoid extra shuffle? Will it just write
>     worker local available data to it's shard? Something similar to
>     coalesce in Spark?
> 
>     On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Oh ok, thanks for the pointer. Coming from Flink, the default is
>         that
>         the sharding is determined by the runtime distribution. Indeed,
>         we will
>         have to add an overwrite to the Flink Runner, similar to this one:
> 
>         https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
> 
>         Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
> 
>         Thanks,
>         Max
> 
>         On 25.10.18 22:37, Reuven Lax wrote:
>          > FYI the Dataflow runner automatically sets the default number
>         of shards
>          > (I believe to be 2 * num_workers). Probably we should do
>         something
>          > similar for the Flink runner.
>          >
>          > This needs to be done by the runner, as # of workers is a runner
>          > concept; the SDK itself has no concept of workers.
>          >
>          > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
>         <jozo.vilcek@gmail.com <ma...@gmail.com>
>          > <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>> wrote:
>          >
>          >     If I do not specify shards for unbounded collection, I get
>          >
>          >     Caused by: java.lang.IllegalArgumentException: When applying
>          >     WriteFiles to an unbounded PCollection, must specify
>         number of
>          >     output shards explicitly
>          >              at
>          >     org.apache.beam.repackaged.beam_sdks_java_core.com
>         <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>          >              at
>          > org.apache.beam.sdk.io
>         <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
>          >
>          >     Around same lines in WriteFiles is also a check for
>         windowed writes.
>          >     I believe FileIO enables it explicitly when windowing is
>         present. In
>          >     filesystem written files are per window and shard.
>          >
>          >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >         I agree it would be nice to keep the current
>         distribution of
>          >         elements
>          >         instead of doing a shuffle based on an artificial
>         shard key.
>          >
>          >         Have you tried `withWindowedWrites()`? Also, why do
>         you say you
>          >         need to
>          >         specify the number of shards in streaming mode?
>          >
>          >         -Max
>          >
>          >         On 25.10.18 10:12, Jozef Vilcek wrote:
>          >          > Hm, yes, this makes sense now, but what can be
>         done for my
>          >         case? I do
>          >          > not want to end up with too many files on disk.
>          >          >
>          >          > I think what I am looking for is to instruct IO
>         that do not
>          >         do again
>          >          > random shard and reshuffle but just assume number
>         of shards
>          >         equal to
>          >          > number of workers and shard ID is a worker ID.
>          >          > Is this doable in beam model?
>          >          >
>          >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>          >         <mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >          > <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>          >          >
>          >          >     The FlinkRunner uses a hash function
>         (MurmurHash) on each
>          >         key which
>          >          >     places keys somewhere in the hash space. The
>         hash space
>          >         (2^32) is split
>          >          >     among the partitions (5 in your case). Given
>         enough keys,
>          >         the chance
>          >          >     increases they are equally spread.
>          >          >
>          >          >     This should be similar to what the other
>         Runners do.
>          >          >
>          >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>          >          >      >
>          >          >      > So if I run 5 workers with 50 shards, I end
>         up with:
>          >          >      >
>          >          >      > DurationBytes receivedRecords received
>          >          >      >   2m 39s        900 MB            465,525
>          >          >      >   2m 39s       1.76 GB            930,720
>          >          >      >   2m 39s        789 MB            407,315
>          >          >      >   2m 39s       1.32 GB            698,262
>          >          >      >   2m 39s        788 MB            407,310
>          >          >      >
>          >          >      > Still not good but better than with 5
>         shards where
>          >         some workers
>          >          >     did not
>          >          >      > participate at all.
>          >          >      > So, problem is in some layer which
>         distributes keys /
>          >         shards
>          >          >     among workers?
>          >          >      >
>          >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>          >         <relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>
>          >          >     <mailto:relax@google.com
>         <ma...@google.com> <mailto:relax@google.com
>         <ma...@google.com>>>
>          >          >      > <mailto:relax@google.com
>         <ma...@google.com> <mailto:relax@google.com
>         <ma...@google.com>>
>          >         <mailto:relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>>>> wrote:
>          >          >      >
>          >          >      >     withNumShards(5) generates 5 random
>         shards. It
>          >         turns out that
>          >          >      >     statistically when you generate 5
>         random shards
>          >         and you have 5
>          >          >      >     works, the probability is reasonably
>         high that
>          >         some workers
>          >          >     will get
>          >          >      >     more than one shard (and as a result
>         not all
>          >         workers will
>          >          >      >     participate). Are you able to set the
>         number of
>          >         shards larger
>          >          >     than 5?
>          >          >      >
>          >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
>         Vilcek
>          >          >     <jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>
>          >          >      >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>>> wrote:
>          >          >      >
>          >          >      >         cc (dev)
>          >          >      >
>          >          >      >         I tried to run the example with
>         FlinkRunner in
>          >         batch mode and
>          >          >      >         received again bad data spread
>         among the workers.
>          >          >      >
>          >          >      >         When I tried to remove number of
>         shards for
>          >         batch mode in
>          >          >     above
>          >          >      >         example, pipeline crashed before launch
>          >          >      >
>          >          >      >         Caused by:
>         java.lang.IllegalStateException:
>          >         Inputs to Flatten
>          >          >      >         had incompatible triggers:
>          >          >      >
>          >          >
>          >         
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>          >          >      >         entCountAtLeast(10000)),
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>          >          >      >         hour)))),
>          >          >      >
>          >          >
>          >         
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>          >          >      >       
>           rever(AfterPane.elementCountAtLeast(1)),
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
>         Jozef Vilcek
>          >          >      >         <jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>
>          >          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>>> wrote:
>          >          >      >
>          >          >      >             Hi Max,
>          >          >      >
>          >          >      >             I forgot to mention that
>         example is run in
>          >         streaming
>          >          >     mode,
>          >          >      >             therefore I can not do writes
>         without
>          >         specifying shards.
>          >          >      >             FileIO explicitly asks for them.
>          >          >      >
>          >          >      >             I am not sure where the problem is.
>          >         FlinkRunner is
>          >          >     only one
>          >          >      >             I used.
>          >          >      >
>          >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>          >         Maximilian Michels
>          >          >      >             <mxm@apache.org
>         <ma...@apache.org> <mailto:mxm@apache.org
>         <ma...@apache.org>>
>          >         <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>
>          >          >     <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >         <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
>          >          >      >
>          >          >      >                 Hi Jozef,
>          >          >      >
>          >          >      >                 This does not look like a
>         FlinkRunner
>          >         related
>          >          >     problem,
>          >          >      >                 but is caused by
>          >          >      >                 the `WriteFiles` sharding
>         logic. It
>          >         assigns keys and
>          >          >      >                 does a Reshuffle
>          >          >      >                 which apparently does not
>         lead to good
>          >         data spread in
>          >          >      >                 your case.
>          >          >      >
>          >          >      >                 Do you see the same
>         behavior without
>          >          >     `withNumShards(5)`?
>          >          >      >
>          >          >      >                 Thanks,
>          >          >      >                 Max
>          >          >      >
>          >          >      >                 On 22.10.18 11:57, Jozef
>         Vilcek wrote:
>          >          >      >                  > Hello,
>          >          >      >                  >
>          >          >      >                  > I am having some trouble
>         to get a
>          >         balanced
>          >          >     write via
>          >          >      >                 FileIO. Workers at
>          >          >      >                  > the shuffle side where
>         data per
>          >         window fire are
>          >          >      >                 written to the
>          >          >      >                  > filesystem receive
>         unbalanced
>          >         number of events.
>          >          >      >                  >
>          >          >      >                  > Here is a naive code
>         example:
>          >          >      >                  >
>          >          >      >                  >      val read =
>         KafkaIO.read()
>          >          >      >                  >          .withTopic("topic")
>          >          >      >                  >
>          >         .withBootstrapServers("kafka1:9092")
>          >          >      >                  >
>          >          >      >
>          >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withValueDeserializer(classOf[ByteArrayDeserializer])
>          >          >      >                  >         
>         .withProcessingTime()
>          >          >      >                  >
>          >          >      >                  >      pipeline
>          >          >      >                  >          .apply(read)
>          >          >      >                  >         
>         .apply(MapElements.via(new
>          >          >      >                  >
>         SimpleFunction[KafkaRecord[Array[Byte],
>          >          >     Array[Byte]],
>          >          >      >                 String]() {
>          >          >      >                  >            override def
>         apply(input:
>          >          >      >                 KafkaRecord[Array[Byte],
>          >          >      >                  > Array[Byte]]): String = {
>          >          >      >                  >              new
>          >         String(input.getKV.getValue,
>          >          >     "UTF-8")
>          >          >      >                  >            }
>          >          >      >                  >          }))
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >         
>           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>          >          >      >                  >
>          >          >     .triggering(AfterWatermark.pastEndOfWindow()
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>          >          >      >                  >
>          >          >      >
>          >          >     
>           Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>          >          >      >                  >             
>         .discardingFiredPanes()
>          >          >      >                  >
>          >          >      >
>          >           .withAllowedLateness(Duration.standardDays(7)))
>          >          >      >                  >
>          >          >      >                  >         
>         .apply(FileIO.write()
>          >          >      >                  >             
>         .via(TextIO.sink())
>          >          >      >                  >              .withNaming(new
>          >          >      >                 SafeFileNaming(outputPath,
>         ".txt"))
>          >          >      >                  >
>          >         .withTempDirectory(tempLocation)
>          >          >      >                  >             
>         .withNumShards(5))
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >                  > If I run this on Beam
>         2.6.0 with
>          >         Flink 1.5.0 on 5
>          >          >      >                 workers (equal to
>          >          >      >                  > number of shards), I
>         would expect
>          >         that each worker
>          >          >      >                 will participate on
>          >          >      >                  > persisting shards and
>         equally,
>          >         since code uses
>          >          >     fixed
>          >          >      >                 number of shards
>          >          >      >                  > (and random shard
>         assign?). But
>          >         reality is
>          >          >     different
>          >          >      >                 (see 2 attachements
>          >          >      >                  > - statistiscs from flink
>         task
>          >         reading from
>          >          >     kafka and
>          >          >      >                 task writing to files)
>          >          >      >                  >
>          >          >      >                  > What am I missing? How
>         to achieve
>          >         balanced writes?
>          >          >      >                  >
>          >          >      >                  > Thanks,
>          >          >      >                  > Jozef
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Actually, I don't think setting the number of shards by the Runner will 
solve the problem. The shuffling logic still remains. And, as observed 
by Jozef, it doesn't necessarily lead to balanced shards.

The sharding logic of the Beam IO is handy but it shouldn't be strictly 
necessary when the data is already partitioned nicely.

It seems the sharding logic is primarily necessary because there is no 
notion of a worker's ID in Beam. In Flink, you can retrieve the worker 
ID at runtime and every worker just directly writes its results to a 
file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.

Robert, don't we already have Reshuffle which can be overriden? However, 
it is not used by the WritesFiles code.


-Max

On 26.10.18 11:41, Robert Bradshaw wrote:
> I think it's worth adding a URN for the operation of distributing 
> "evenly" into an "appropriate" number of shards. A naive implementation 
> would add random keys and to a ReshufflePerKey, but runners could 
> override this to do a reshuffle and then key by whatever notion of 
> bundle/worker/shard identifier they have that lines up with the number 
> of actual workers.
> 
> On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vilcek@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Thanks for the JIRA. If I understand it correctly ... so runner
>     determined sharding will avoid extra shuffle? Will it just write
>     worker local available data to it's shard? Something similar to
>     coalesce in Spark?
> 
>     On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Oh ok, thanks for the pointer. Coming from Flink, the default is
>         that
>         the sharding is determined by the runtime distribution. Indeed,
>         we will
>         have to add an overwrite to the Flink Runner, similar to this one:
> 
>         https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
> 
>         Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
> 
>         Thanks,
>         Max
> 
>         On 25.10.18 22:37, Reuven Lax wrote:
>          > FYI the Dataflow runner automatically sets the default number
>         of shards
>          > (I believe to be 2 * num_workers). Probably we should do
>         something
>          > similar for the Flink runner.
>          >
>          > This needs to be done by the runner, as # of workers is a runner
>          > concept; the SDK itself has no concept of workers.
>          >
>          > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
>         <jozo.vilcek@gmail.com <ma...@gmail.com>
>          > <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>> wrote:
>          >
>          >     If I do not specify shards for unbounded collection, I get
>          >
>          >     Caused by: java.lang.IllegalArgumentException: When applying
>          >     WriteFiles to an unbounded PCollection, must specify
>         number of
>          >     output shards explicitly
>          >              at
>          >     org.apache.beam.repackaged.beam_sdks_java_core.com
>         <http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>          >              at
>          > org.apache.beam.sdk.io
>         <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
>          >
>          >     Around same lines in WriteFiles is also a check for
>         windowed writes.
>          >     I believe FileIO enables it explicitly when windowing is
>         present. In
>          >     filesystem written files are per window and shard.
>          >
>          >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >         I agree it would be nice to keep the current
>         distribution of
>          >         elements
>          >         instead of doing a shuffle based on an artificial
>         shard key.
>          >
>          >         Have you tried `withWindowedWrites()`? Also, why do
>         you say you
>          >         need to
>          >         specify the number of shards in streaming mode?
>          >
>          >         -Max
>          >
>          >         On 25.10.18 10:12, Jozef Vilcek wrote:
>          >          > Hm, yes, this makes sense now, but what can be
>         done for my
>          >         case? I do
>          >          > not want to end up with too many files on disk.
>          >          >
>          >          > I think what I am looking for is to instruct IO
>         that do not
>          >         do again
>          >          > random shard and reshuffle but just assume number
>         of shards
>          >         equal to
>          >          > number of workers and shard ID is a worker ID.
>          >          > Is this doable in beam model?
>          >          >
>          >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>          >         <mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >          > <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>          >          >
>          >          >     The FlinkRunner uses a hash function
>         (MurmurHash) on each
>          >         key which
>          >          >     places keys somewhere in the hash space. The
>         hash space
>          >         (2^32) is split
>          >          >     among the partitions (5 in your case). Given
>         enough keys,
>          >         the chance
>          >          >     increases they are equally spread.
>          >          >
>          >          >     This should be similar to what the other
>         Runners do.
>          >          >
>          >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>          >          >      >
>          >          >      > So if I run 5 workers with 50 shards, I end
>         up with:
>          >          >      >
>          >          >      > DurationBytes receivedRecords received
>          >          >      >   2m 39s        900 MB            465,525
>          >          >      >   2m 39s       1.76 GB            930,720
>          >          >      >   2m 39s        789 MB            407,315
>          >          >      >   2m 39s       1.32 GB            698,262
>          >          >      >   2m 39s        788 MB            407,310
>          >          >      >
>          >          >      > Still not good but better than with 5
>         shards where
>          >         some workers
>          >          >     did not
>          >          >      > participate at all.
>          >          >      > So, problem is in some layer which
>         distributes keys /
>          >         shards
>          >          >     among workers?
>          >          >      >
>          >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>          >         <relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>
>          >          >     <mailto:relax@google.com
>         <ma...@google.com> <mailto:relax@google.com
>         <ma...@google.com>>>
>          >          >      > <mailto:relax@google.com
>         <ma...@google.com> <mailto:relax@google.com
>         <ma...@google.com>>
>          >         <mailto:relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>>>> wrote:
>          >          >      >
>          >          >      >     withNumShards(5) generates 5 random
>         shards. It
>          >         turns out that
>          >          >      >     statistically when you generate 5
>         random shards
>          >         and you have 5
>          >          >      >     works, the probability is reasonably
>         high that
>          >         some workers
>          >          >     will get
>          >          >      >     more than one shard (and as a result
>         not all
>          >         workers will
>          >          >      >     participate). Are you able to set the
>         number of
>          >         shards larger
>          >          >     than 5?
>          >          >      >
>          >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
>         Vilcek
>          >          >     <jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>
>          >          >      >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>>> wrote:
>          >          >      >
>          >          >      >         cc (dev)
>          >          >      >
>          >          >      >         I tried to run the example with
>         FlinkRunner in
>          >         batch mode and
>          >          >      >         received again bad data spread
>         among the workers.
>          >          >      >
>          >          >      >         When I tried to remove number of
>         shards for
>          >         batch mode in
>          >          >     above
>          >          >      >         example, pipeline crashed before launch
>          >          >      >
>          >          >      >         Caused by:
>         java.lang.IllegalStateException:
>          >         Inputs to Flatten
>          >          >      >         had incompatible triggers:
>          >          >      >
>          >          >
>          >         
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>          >          >      >         entCountAtLeast(10000)),
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>          >          >      >         hour)))),
>          >          >      >
>          >          >
>          >         
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>          >          >      >       
>           rever(AfterPane.elementCountAtLeast(1)),
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >
>          >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
>         Jozef Vilcek
>          >          >      >         <jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>
>          >          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >         <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>>> wrote:
>          >          >      >
>          >          >      >             Hi Max,
>          >          >      >
>          >          >      >             I forgot to mention that
>         example is run in
>          >         streaming
>          >          >     mode,
>          >          >      >             therefore I can not do writes
>         without
>          >         specifying shards.
>          >          >      >             FileIO explicitly asks for them.
>          >          >      >
>          >          >      >             I am not sure where the problem is.
>          >         FlinkRunner is
>          >          >     only one
>          >          >      >             I used.
>          >          >      >
>          >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>          >         Maximilian Michels
>          >          >      >             <mxm@apache.org
>         <ma...@apache.org> <mailto:mxm@apache.org
>         <ma...@apache.org>>
>          >         <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>
>          >          >     <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >         <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>>> wrote:
>          >          >      >
>          >          >      >                 Hi Jozef,
>          >          >      >
>          >          >      >                 This does not look like a
>         FlinkRunner
>          >         related
>          >          >     problem,
>          >          >      >                 but is caused by
>          >          >      >                 the `WriteFiles` sharding
>         logic. It
>          >         assigns keys and
>          >          >      >                 does a Reshuffle
>          >          >      >                 which apparently does not
>         lead to good
>          >         data spread in
>          >          >      >                 your case.
>          >          >      >
>          >          >      >                 Do you see the same
>         behavior without
>          >          >     `withNumShards(5)`?
>          >          >      >
>          >          >      >                 Thanks,
>          >          >      >                 Max
>          >          >      >
>          >          >      >                 On 22.10.18 11:57, Jozef
>         Vilcek wrote:
>          >          >      >                  > Hello,
>          >          >      >                  >
>          >          >      >                  > I am having some trouble
>         to get a
>          >         balanced
>          >          >     write via
>          >          >      >                 FileIO. Workers at
>          >          >      >                  > the shuffle side where
>         data per
>          >         window fire are
>          >          >      >                 written to the
>          >          >      >                  > filesystem receive
>         unbalanced
>          >         number of events.
>          >          >      >                  >
>          >          >      >                  > Here is a naive code
>         example:
>          >          >      >                  >
>          >          >      >                  >      val read =
>         KafkaIO.read()
>          >          >      >                  >          .withTopic("topic")
>          >          >      >                  >
>          >         .withBootstrapServers("kafka1:9092")
>          >          >      >                  >
>          >          >      >
>          >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withValueDeserializer(classOf[ByteArrayDeserializer])
>          >          >      >                  >         
>         .withProcessingTime()
>          >          >      >                  >
>          >          >      >                  >      pipeline
>          >          >      >                  >          .apply(read)
>          >          >      >                  >         
>         .apply(MapElements.via(new
>          >          >      >                  >
>         SimpleFunction[KafkaRecord[Array[Byte],
>          >          >     Array[Byte]],
>          >          >      >                 String]() {
>          >          >      >                  >            override def
>         apply(input:
>          >          >      >                 KafkaRecord[Array[Byte],
>          >          >      >                  > Array[Byte]]): String = {
>          >          >      >                  >              new
>          >         String(input.getKV.getValue,
>          >          >     "UTF-8")
>          >          >      >                  >            }
>          >          >      >                  >          }))
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >         
>           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>          >          >      >                  >
>          >          >     .triggering(AfterWatermark.pastEndOfWindow()
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>          >          >      >                  >
>          >          >      >
>          >          >     
>           .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>          >          >      >                  >
>          >          >      >
>          >          >     
>           Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >         
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>          >          >      >                  >             
>         .discardingFiredPanes()
>          >          >      >                  >
>          >          >      >
>          >           .withAllowedLateness(Duration.standardDays(7)))
>          >          >      >                  >
>          >          >      >                  >         
>         .apply(FileIO.write()
>          >          >      >                  >             
>         .via(TextIO.sink())
>          >          >      >                  >              .withNaming(new
>          >          >      >                 SafeFileNaming(outputPath,
>         ".txt"))
>          >          >      >                  >
>          >         .withTempDirectory(tempLocation)
>          >          >      >                  >             
>         .withNumShards(5))
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >                  > If I run this on Beam
>         2.6.0 with
>          >         Flink 1.5.0 on 5
>          >          >      >                 workers (equal to
>          >          >      >                  > number of shards), I
>         would expect
>          >         that each worker
>          >          >      >                 will participate on
>          >          >      >                  > persisting shards and
>         equally,
>          >         since code uses
>          >          >     fixed
>          >          >      >                 number of shards
>          >          >      >                  > (and random shard
>         assign?). But
>          >         reality is
>          >          >     different
>          >          >      >                 (see 2 attachements
>          >          >      >                  > - statistiscs from flink
>         task
>          >         reading from
>          >          >     kafka and
>          >          >      >                 task writing to files)
>          >          >      >                  >
>          >          >      >                  > What am I missing? How
>         to achieve
>          >         balanced writes?
>          >          >      >                  >
>          >          >      >                  > Thanks,
>          >          >      >                  > Jozef
>          >          >      >                  >
>          >          >      >                  >
>          >          >      >
>          >          >
>          >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Robert Bradshaw <ro...@google.com>.
I think it's worth adding a URN for the operation of distributing "evenly"
into an "appropriate" number of shards. A naive implementation would add
random keys and to a ReshufflePerKey, but runners could override this to do
a reshuffle and then key by whatever notion of bundle/worker/shard
identifier they have that lines up with the number of actual workers.

On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Thanks for the JIRA. If I understand it correctly ... so runner determined
> sharding will avoid extra shuffle? Will it just write worker local
> available data to it's shard? Something similar to coalesce in Spark?
>
> On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Oh ok, thanks for the pointer. Coming from Flink, the default is that
>> the sharding is determined by the runtime distribution. Indeed, we will
>> have to add an overwrite to the Flink Runner, similar to this one:
>>
>>
>> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>>
>> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>>
>> Thanks,
>> Max
>>
>> On 25.10.18 22:37, Reuven Lax wrote:
>> > FYI the Dataflow runner automatically sets the default number of shards
>> > (I believe to be 2 * num_workers). Probably we should do something
>> > similar for the Flink runner.
>> >
>> > This needs to be done by the runner, as # of workers is a runner
>> > concept; the SDK itself has no concept of workers.
>> >
>> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     If I do not specify shards for unbounded collection, I get
>> >
>> >     Caused by: java.lang.IllegalArgumentException: When applying
>> >     WriteFiles to an unbounded PCollection, must specify number of
>> >     output shards explicitly
>> >              at
>> >     org.apache.beam.repackaged.beam_sdks_java_core.com
>> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> >              at
>> >     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
>> >
>> >     Around same lines in WriteFiles is also a check for windowed writes.
>> >     I believe FileIO enables it explicitly when windowing is present. In
>> >     filesystem written files are per window and shard.
>> >
>> >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
>> >     <ma...@apache.org>> wrote:
>> >
>> >         I agree it would be nice to keep the current distribution of
>> >         elements
>> >         instead of doing a shuffle based on an artificial shard key.
>> >
>> >         Have you tried `withWindowedWrites()`? Also, why do you say you
>> >         need to
>> >         specify the number of shards in streaming mode?
>> >
>> >         -Max
>> >
>> >         On 25.10.18 10:12, Jozef Vilcek wrote:
>> >          > Hm, yes, this makes sense now, but what can be done for my
>> >         case? I do
>> >          > not want to end up with too many files on disk.
>> >          >
>> >          > I think what I am looking for is to instruct IO that do not
>> >         do again
>> >          > random shard and reshuffle but just assume number of shards
>> >         equal to
>> >          > number of workers and shard ID is a worker ID.
>> >          > Is this doable in beam model?
>> >          >
>> >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>> >         <mxm@apache.org <ma...@apache.org>
>> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>> >          >
>> >          >     The FlinkRunner uses a hash function (MurmurHash) on each
>> >         key which
>> >          >     places keys somewhere in the hash space. The hash space
>> >         (2^32) is split
>> >          >     among the partitions (5 in your case). Given enough keys,
>> >         the chance
>> >          >     increases they are equally spread.
>> >          >
>> >          >     This should be similar to what the other Runners do.
>> >          >
>> >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>> >          >      >
>> >          >      > So if I run 5 workers with 50 shards, I end up with:
>> >          >      >
>> >          >      > DurationBytes receivedRecords received
>> >          >      >   2m 39s        900 MB            465,525
>> >          >      >   2m 39s       1.76 GB            930,720
>> >          >      >   2m 39s        789 MB            407,315
>> >          >      >   2m 39s       1.32 GB            698,262
>> >          >      >   2m 39s        788 MB            407,310
>> >          >      >
>> >          >      > Still not good but better than with 5 shards where
>> >         some workers
>> >          >     did not
>> >          >      > participate at all.
>> >          >      > So, problem is in some layer which distributes keys /
>> >         shards
>> >          >     among workers?
>> >          >      >
>> >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>> >         <relax@google.com <ma...@google.com>
>> >          >     <mailto:relax@google.com <ma...@google.com>>
>> >          >      > <mailto:relax@google.com <ma...@google.com>
>> >         <mailto:relax@google.com <ma...@google.com>>>> wrote:
>> >          >      >
>> >          >      >     withNumShards(5) generates 5 random shards. It
>> >         turns out that
>> >          >      >     statistically when you generate 5 random shards
>> >         and you have 5
>> >          >      >     works, the probability is reasonably high that
>> >         some workers
>> >          >     will get
>> >          >      >     more than one shard (and as a result not all
>> >         workers will
>> >          >      >     participate). Are you able to set the number of
>> >         shards larger
>> >          >     than 5?
>> >          >      >
>> >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>> >          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
>> >         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
>> >          >      >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >         cc (dev)
>> >          >      >
>> >          >      >         I tried to run the example with FlinkRunner in
>> >         batch mode and
>> >          >      >         received again bad data spread among the
>> workers.
>> >          >      >
>> >          >      >         When I tried to remove number of shards for
>> >         batch mode in
>> >          >     above
>> >          >      >         example, pipeline crashed before launch
>> >          >      >
>> >          >      >         Caused by: java.lang.IllegalStateException:
>> >         Inputs to Flatten
>> >          >      >         had incompatible triggers:
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> >          >      >         entCountAtLeast(10000)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> >          >      >         hour)))),
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> >          >      >         rever(AfterPane.elementCountAtLeast(1)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>> >          >      >         <jozo.vilcek@gmail.com
>> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >             Hi Max,
>> >          >      >
>> >          >      >             I forgot to mention that example is run in
>> >         streaming
>> >          >     mode,
>> >          >      >             therefore I can not do writes without
>> >         specifying shards.
>> >          >      >             FileIO explicitly asks for them.
>> >          >      >
>> >          >      >             I am not sure where the problem is.
>> >         FlinkRunner is
>> >          >     only one
>> >          >      >             I used.
>> >          >      >
>> >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>> >         Maximilian Michels
>> >          >      >             <mxm@apache.org <ma...@apache.org>
>> >         <mailto:mxm@apache.org <ma...@apache.org>>
>> >          >     <mailto:mxm@apache.org <ma...@apache.org>
>> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>> >          >      >
>> >          >      >                 Hi Jozef,
>> >          >      >
>> >          >      >                 This does not look like a FlinkRunner
>> >         related
>> >          >     problem,
>> >          >      >                 but is caused by
>> >          >      >                 the `WriteFiles` sharding logic. It
>> >         assigns keys and
>> >          >      >                 does a Reshuffle
>> >          >      >                 which apparently does not lead to good
>> >         data spread in
>> >          >      >                 your case.
>> >          >      >
>> >          >      >                 Do you see the same behavior without
>> >          >     `withNumShards(5)`?
>> >          >      >
>> >          >      >                 Thanks,
>> >          >      >                 Max
>> >          >      >
>> >          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>> >          >      >                  > Hello,
>> >          >      >                  >
>> >          >      >                  > I am having some trouble to get a
>> >         balanced
>> >          >     write via
>> >          >      >                 FileIO. Workers at
>> >          >      >                  > the shuffle side where data per
>> >         window fire are
>> >          >      >                 written to the
>> >          >      >                  > filesystem receive unbalanced
>> >         number of events.
>> >          >      >                  >
>> >          >      >                  > Here is a naive code example:
>> >          >      >                  >
>> >          >      >                  >      val read = KafkaIO.read()
>> >          >      >                  >          .withTopic("topic")
>> >          >      >                  >
>> >         .withBootstrapServers("kafka1:9092")
>> >          >      >                  >
>> >          >      >
>> >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >
>> >          >      >
>> >          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >          .withProcessingTime()
>> >          >      >                  >
>> >          >      >                  >      pipeline
>> >          >      >                  >          .apply(read)
>> >          >      >                  >          .apply(MapElements.via(new
>> >          >      >                  >
>> SimpleFunction[KafkaRecord[Array[Byte],
>> >          >     Array[Byte]],
>> >          >      >                 String]() {
>> >          >      >                  >            override def
>> apply(input:
>> >          >      >                 KafkaRecord[Array[Byte],
>> >          >      >                  > Array[Byte]]): String = {
>> >          >      >                  >              new
>> >         String(input.getKV.getValue,
>> >          >     "UTF-8")
>> >          >      >                  >            }
>> >          >      >                  >          }))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >          >      >                  >
>> >          >     .triggering(AfterWatermark.pastEndOfWindow()
>> >          >      >                  >
>> >          >      >
>> >          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >          >      >                  >
>> >          >      >
>> >          >
>>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >          >      >                  >
>> >          >      >
>> >          >
>>  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >          >      >                  >
>> .discardingFiredPanes()
>> >          >      >                  >
>> >          >      >
>> >           .withAllowedLateness(Duration.standardDays(7)))
>> >          >      >                  >
>> >          >      >                  >          .apply(FileIO.write()
>> >          >      >                  >              .via(TextIO.sink())
>> >          >      >                  >              .withNaming(new
>> >          >      >                 SafeFileNaming(outputPath, ".txt"))
>> >          >      >                  >
>> >         .withTempDirectory(tempLocation)
>> >          >      >                  >              .withNumShards(5))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  > If I run this on Beam 2.6.0 with
>> >         Flink 1.5.0 on 5
>> >          >      >                 workers (equal to
>> >          >      >                  > number of shards), I would expect
>> >         that each worker
>> >          >      >                 will participate on
>> >          >      >                  > persisting shards and equally,
>> >         since code uses
>> >          >     fixed
>> >          >      >                 number of shards
>> >          >      >                  > (and random shard assign?). But
>> >         reality is
>> >          >     different
>> >          >      >                 (see 2 attachements
>> >          >      >                  > - statistiscs from flink task
>> >         reading from
>> >          >     kafka and
>> >          >      >                 task writing to files)
>> >          >      >                  >
>> >          >      >                  > What am I missing? How to achieve
>> >         balanced writes?
>> >          >      >                  >
>> >          >      >                  > Thanks,
>> >          >      >                  > Jozef
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>
>

Re: Unbalanced FileIO writes on Flink

Posted by Robert Bradshaw <ro...@google.com>.
I think it's worth adding a URN for the operation of distributing "evenly"
into an "appropriate" number of shards. A naive implementation would add
random keys and to a ReshufflePerKey, but runners could override this to do
a reshuffle and then key by whatever notion of bundle/worker/shard
identifier they have that lines up with the number of actual workers.

On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jo...@gmail.com> wrote:

> Thanks for the JIRA. If I understand it correctly ... so runner determined
> sharding will avoid extra shuffle? Will it just write worker local
> available data to it's shard? Something similar to coalesce in Spark?
>
> On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Oh ok, thanks for the pointer. Coming from Flink, the default is that
>> the sharding is determined by the runtime distribution. Indeed, we will
>> have to add an overwrite to the Flink Runner, similar to this one:
>>
>>
>> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>>
>> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>>
>> Thanks,
>> Max
>>
>> On 25.10.18 22:37, Reuven Lax wrote:
>> > FYI the Dataflow runner automatically sets the default number of shards
>> > (I believe to be 2 * num_workers). Probably we should do something
>> > similar for the Flink runner.
>> >
>> > This needs to be done by the runner, as # of workers is a runner
>> > concept; the SDK itself has no concept of workers.
>> >
>> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     If I do not specify shards for unbounded collection, I get
>> >
>> >     Caused by: java.lang.IllegalArgumentException: When applying
>> >     WriteFiles to an unbounded PCollection, must specify number of
>> >     output shards explicitly
>> >              at
>> >     org.apache.beam.repackaged.beam_sdks_java_core.com
>> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>> >              at
>> >     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
>> >
>> >     Around same lines in WriteFiles is also a check for windowed writes.
>> >     I believe FileIO enables it explicitly when windowing is present. In
>> >     filesystem written files are per window and shard.
>> >
>> >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
>> >     <ma...@apache.org>> wrote:
>> >
>> >         I agree it would be nice to keep the current distribution of
>> >         elements
>> >         instead of doing a shuffle based on an artificial shard key.
>> >
>> >         Have you tried `withWindowedWrites()`? Also, why do you say you
>> >         need to
>> >         specify the number of shards in streaming mode?
>> >
>> >         -Max
>> >
>> >         On 25.10.18 10:12, Jozef Vilcek wrote:
>> >          > Hm, yes, this makes sense now, but what can be done for my
>> >         case? I do
>> >          > not want to end up with too many files on disk.
>> >          >
>> >          > I think what I am looking for is to instruct IO that do not
>> >         do again
>> >          > random shard and reshuffle but just assume number of shards
>> >         equal to
>> >          > number of workers and shard ID is a worker ID.
>> >          > Is this doable in beam model?
>> >          >
>> >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>> >         <mxm@apache.org <ma...@apache.org>
>> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>> >          >
>> >          >     The FlinkRunner uses a hash function (MurmurHash) on each
>> >         key which
>> >          >     places keys somewhere in the hash space. The hash space
>> >         (2^32) is split
>> >          >     among the partitions (5 in your case). Given enough keys,
>> >         the chance
>> >          >     increases they are equally spread.
>> >          >
>> >          >     This should be similar to what the other Runners do.
>> >          >
>> >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>> >          >      >
>> >          >      > So if I run 5 workers with 50 shards, I end up with:
>> >          >      >
>> >          >      > DurationBytes receivedRecords received
>> >          >      >   2m 39s        900 MB            465,525
>> >          >      >   2m 39s       1.76 GB            930,720
>> >          >      >   2m 39s        789 MB            407,315
>> >          >      >   2m 39s       1.32 GB            698,262
>> >          >      >   2m 39s        788 MB            407,310
>> >          >      >
>> >          >      > Still not good but better than with 5 shards where
>> >         some workers
>> >          >     did not
>> >          >      > participate at all.
>> >          >      > So, problem is in some layer which distributes keys /
>> >         shards
>> >          >     among workers?
>> >          >      >
>> >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>> >         <relax@google.com <ma...@google.com>
>> >          >     <mailto:relax@google.com <ma...@google.com>>
>> >          >      > <mailto:relax@google.com <ma...@google.com>
>> >         <mailto:relax@google.com <ma...@google.com>>>> wrote:
>> >          >      >
>> >          >      >     withNumShards(5) generates 5 random shards. It
>> >         turns out that
>> >          >      >     statistically when you generate 5 random shards
>> >         and you have 5
>> >          >      >     works, the probability is reasonably high that
>> >         some workers
>> >          >     will get
>> >          >      >     more than one shard (and as a result not all
>> >         workers will
>> >          >      >     participate). Are you able to set the number of
>> >         shards larger
>> >          >     than 5?
>> >          >      >
>> >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>> >          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
>> >         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
>> >          >      >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >         cc (dev)
>> >          >      >
>> >          >      >         I tried to run the example with FlinkRunner in
>> >         batch mode and
>> >          >      >         received again bad data spread among the
>> workers.
>> >          >      >
>> >          >      >         When I tried to remove number of shards for
>> >         batch mode in
>> >          >     above
>> >          >      >         example, pipeline crashed before launch
>> >          >      >
>> >          >      >         Caused by: java.lang.IllegalStateException:
>> >         Inputs to Flatten
>> >          >      >         had incompatible triggers:
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> >          >      >         entCountAtLeast(10000)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> >          >      >         hour)))),
>> >          >      >
>> >          >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> >          >      >         rever(AfterPane.elementCountAtLeast(1)),
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >
>> >          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>> >          >      >         <jozo.vilcek@gmail.com
>> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>
>> >          >     <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>> >         <ma...@gmail.com>>>> wrote:
>> >          >      >
>> >          >      >             Hi Max,
>> >          >      >
>> >          >      >             I forgot to mention that example is run in
>> >         streaming
>> >          >     mode,
>> >          >      >             therefore I can not do writes without
>> >         specifying shards.
>> >          >      >             FileIO explicitly asks for them.
>> >          >      >
>> >          >      >             I am not sure where the problem is.
>> >         FlinkRunner is
>> >          >     only one
>> >          >      >             I used.
>> >          >      >
>> >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>> >         Maximilian Michels
>> >          >      >             <mxm@apache.org <ma...@apache.org>
>> >         <mailto:mxm@apache.org <ma...@apache.org>>
>> >          >     <mailto:mxm@apache.org <ma...@apache.org>
>> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>> >          >      >
>> >          >      >                 Hi Jozef,
>> >          >      >
>> >          >      >                 This does not look like a FlinkRunner
>> >         related
>> >          >     problem,
>> >          >      >                 but is caused by
>> >          >      >                 the `WriteFiles` sharding logic. It
>> >         assigns keys and
>> >          >      >                 does a Reshuffle
>> >          >      >                 which apparently does not lead to good
>> >         data spread in
>> >          >      >                 your case.
>> >          >      >
>> >          >      >                 Do you see the same behavior without
>> >          >     `withNumShards(5)`?
>> >          >      >
>> >          >      >                 Thanks,
>> >          >      >                 Max
>> >          >      >
>> >          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>> >          >      >                  > Hello,
>> >          >      >                  >
>> >          >      >                  > I am having some trouble to get a
>> >         balanced
>> >          >     write via
>> >          >      >                 FileIO. Workers at
>> >          >      >                  > the shuffle side where data per
>> >         window fire are
>> >          >      >                 written to the
>> >          >      >                  > filesystem receive unbalanced
>> >         number of events.
>> >          >      >                  >
>> >          >      >                  > Here is a naive code example:
>> >          >      >                  >
>> >          >      >                  >      val read = KafkaIO.read()
>> >          >      >                  >          .withTopic("topic")
>> >          >      >                  >
>> >         .withBootstrapServers("kafka1:9092")
>> >          >      >                  >
>> >          >      >
>> >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >
>> >          >      >
>> >          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >          >      >                  >          .withProcessingTime()
>> >          >      >                  >
>> >          >      >                  >      pipeline
>> >          >      >                  >          .apply(read)
>> >          >      >                  >          .apply(MapElements.via(new
>> >          >      >                  >
>> SimpleFunction[KafkaRecord[Array[Byte],
>> >          >     Array[Byte]],
>> >          >      >                 String]() {
>> >          >      >                  >            override def
>> apply(input:
>> >          >      >                 KafkaRecord[Array[Byte],
>> >          >      >                  > Array[Byte]]): String = {
>> >          >      >                  >              new
>> >         String(input.getKV.getValue,
>> >          >     "UTF-8")
>> >          >      >                  >            }
>> >          >      >                  >          }))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >          >      >                  >
>> >          >     .triggering(AfterWatermark.pastEndOfWindow()
>> >          >      >                  >
>> >          >      >
>> >          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >          >      >                  >
>> >          >      >
>> >          >
>>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >          >      >                  >
>> >          >      >
>> >          >
>>  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >          >      >                  >
>> .discardingFiredPanes()
>> >          >      >                  >
>> >          >      >
>> >           .withAllowedLateness(Duration.standardDays(7)))
>> >          >      >                  >
>> >          >      >                  >          .apply(FileIO.write()
>> >          >      >                  >              .via(TextIO.sink())
>> >          >      >                  >              .withNaming(new
>> >          >      >                 SafeFileNaming(outputPath, ".txt"))
>> >          >      >                  >
>> >         .withTempDirectory(tempLocation)
>> >          >      >                  >              .withNumShards(5))
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >                  > If I run this on Beam 2.6.0 with
>> >         Flink 1.5.0 on 5
>> >          >      >                 workers (equal to
>> >          >      >                  > number of shards), I would expect
>> >         that each worker
>> >          >      >                 will participate on
>> >          >      >                  > persisting shards and equally,
>> >         since code uses
>> >          >     fixed
>> >          >      >                 number of shards
>> >          >      >                  > (and random shard assign?). But
>> >         reality is
>> >          >     different
>> >          >      >                 (see 2 attachements
>> >          >      >                  > - statistiscs from flink task
>> >         reading from
>> >          >     kafka and
>> >          >      >                 task writing to files)
>> >          >      >                  >
>> >          >      >                  > What am I missing? How to achieve
>> >         balanced writes?
>> >          >      >                  >
>> >          >      >                  > Thanks,
>> >          >      >                  > Jozef
>> >          >      >                  >
>> >          >      >                  >
>> >          >      >
>> >          >
>> >
>>
>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
Thanks for the JIRA. If I understand it correctly ... so runner determined
sharding will avoid extra shuffle? Will it just write worker local
available data to it's shard? Something similar to coalesce in Spark?

On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mx...@apache.org> wrote:

> Oh ok, thanks for the pointer. Coming from Flink, the default is that
> the sharding is determined by the runtime distribution. Indeed, we will
> have to add an overwrite to the Flink Runner, similar to this one:
>
>
> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>
> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>
> Thanks,
> Max
>
> On 25.10.18 22:37, Reuven Lax wrote:
> > FYI the Dataflow runner automatically sets the default number of shards
> > (I believe to be 2 * num_workers). Probably we should do something
> > similar for the Flink runner.
> >
> > This needs to be done by the runner, as # of workers is a runner
> > concept; the SDK itself has no concept of workers.
> >
> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     If I do not specify shards for unbounded collection, I get
> >
> >     Caused by: java.lang.IllegalArgumentException: When applying
> >     WriteFiles to an unbounded PCollection, must specify number of
> >     output shards explicitly
> >              at
> >     org.apache.beam.repackaged.beam_sdks_java_core.com
> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> >              at
> >     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
> >
> >     Around same lines in WriteFiles is also a check for windowed writes.
> >     I believe FileIO enables it explicitly when windowing is present. In
> >     filesystem written files are per window and shard.
> >
> >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         I agree it would be nice to keep the current distribution of
> >         elements
> >         instead of doing a shuffle based on an artificial shard key.
> >
> >         Have you tried `withWindowedWrites()`? Also, why do you say you
> >         need to
> >         specify the number of shards in streaming mode?
> >
> >         -Max
> >
> >         On 25.10.18 10:12, Jozef Vilcek wrote:
> >          > Hm, yes, this makes sense now, but what can be done for my
> >         case? I do
> >          > not want to end up with too many files on disk.
> >          >
> >          > I think what I am looking for is to instruct IO that do not
> >         do again
> >          > random shard and reshuffle but just assume number of shards
> >         equal to
> >          > number of workers and shard ID is a worker ID.
> >          > Is this doable in beam model?
> >          >
> >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >     The FlinkRunner uses a hash function (MurmurHash) on each
> >         key which
> >          >     places keys somewhere in the hash space. The hash space
> >         (2^32) is split
> >          >     among the partitions (5 in your case). Given enough keys,
> >         the chance
> >          >     increases they are equally spread.
> >          >
> >          >     This should be similar to what the other Runners do.
> >          >
> >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >          >      >
> >          >      > So if I run 5 workers with 50 shards, I end up with:
> >          >      >
> >          >      > DurationBytes receivedRecords received
> >          >      >   2m 39s        900 MB            465,525
> >          >      >   2m 39s       1.76 GB            930,720
> >          >      >   2m 39s        789 MB            407,315
> >          >      >   2m 39s       1.32 GB            698,262
> >          >      >   2m 39s        788 MB            407,310
> >          >      >
> >          >      > Still not good but better than with 5 shards where
> >         some workers
> >          >     did not
> >          >      > participate at all.
> >          >      > So, problem is in some layer which distributes keys /
> >         shards
> >          >     among workers?
> >          >      >
> >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
> >         <relax@google.com <ma...@google.com>
> >          >     <mailto:relax@google.com <ma...@google.com>>
> >          >      > <mailto:relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>>> wrote:
> >          >      >
> >          >      >     withNumShards(5) generates 5 random shards. It
> >         turns out that
> >          >      >     statistically when you generate 5 random shards
> >         and you have 5
> >          >      >     works, the probability is reasonably high that
> >         some workers
> >          >     will get
> >          >      >     more than one shard (and as a result not all
> >         workers will
> >          >      >     participate). Are you able to set the number of
> >         shards larger
> >          >     than 5?
> >          >      >
> >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> >          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
> >         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
> >          >      >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>> wrote:
> >          >      >
> >          >      >         cc (dev)
> >          >      >
> >          >      >         I tried to run the example with FlinkRunner in
> >         batch mode and
> >          >      >         received again bad data spread among the
> workers.
> >          >      >
> >          >      >         When I tried to remove number of shards for
> >         batch mode in
> >          >     above
> >          >      >         example, pipeline crashed before launch
> >          >      >
> >          >      >         Caused by: java.lang.IllegalStateException:
> >         Inputs to Flatten
> >          >      >         had incompatible triggers:
> >          >      >
> >          >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >          >      >         entCountAtLeast(10000)),
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >          >      >         hour)))),
> >          >      >
> >          >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >          >      >         rever(AfterPane.elementCountAtLeast(1)),
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >          >      >
> >          >      >
> >          >      >
> >          >      >
> >          >      >
> >          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >          >      >         <jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>> wrote:
> >          >      >
> >          >      >             Hi Max,
> >          >      >
> >          >      >             I forgot to mention that example is run in
> >         streaming
> >          >     mode,
> >          >      >             therefore I can not do writes without
> >         specifying shards.
> >          >      >             FileIO explicitly asks for them.
> >          >      >
> >          >      >             I am not sure where the problem is.
> >         FlinkRunner is
> >          >     only one
> >          >      >             I used.
> >          >      >
> >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
> >         Maximilian Michels
> >          >      >             <mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >     <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >          >      >
> >          >      >                 Hi Jozef,
> >          >      >
> >          >      >                 This does not look like a FlinkRunner
> >         related
> >          >     problem,
> >          >      >                 but is caused by
> >          >      >                 the `WriteFiles` sharding logic. It
> >         assigns keys and
> >          >      >                 does a Reshuffle
> >          >      >                 which apparently does not lead to good
> >         data spread in
> >          >      >                 your case.
> >          >      >
> >          >      >                 Do you see the same behavior without
> >          >     `withNumShards(5)`?
> >          >      >
> >          >      >                 Thanks,
> >          >      >                 Max
> >          >      >
> >          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >          >      >                  > Hello,
> >          >      >                  >
> >          >      >                  > I am having some trouble to get a
> >         balanced
> >          >     write via
> >          >      >                 FileIO. Workers at
> >          >      >                  > the shuffle side where data per
> >         window fire are
> >          >      >                 written to the
> >          >      >                  > filesystem receive unbalanced
> >         number of events.
> >          >      >                  >
> >          >      >                  > Here is a naive code example:
> >          >      >                  >
> >          >      >                  >      val read = KafkaIO.read()
> >          >      >                  >          .withTopic("topic")
> >          >      >                  >
> >         .withBootstrapServers("kafka1:9092")
> >          >      >                  >
> >          >      >
> >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >          >      >                  >
> >          >      >
> >          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
> >          >      >                  >          .withProcessingTime()
> >          >      >                  >
> >          >      >                  >      pipeline
> >          >      >                  >          .apply(read)
> >          >      >                  >          .apply(MapElements.via(new
> >          >      >                  >
> SimpleFunction[KafkaRecord[Array[Byte],
> >          >     Array[Byte]],
> >          >      >                 String]() {
> >          >      >                  >            override def apply(input:
> >          >      >                 KafkaRecord[Array[Byte],
> >          >      >                  > Array[Byte]]): String = {
> >          >      >                  >              new
> >         String(input.getKV.getValue,
> >          >     "UTF-8")
> >          >      >                  >            }
> >          >      >                  >          }))
> >          >      >                  >
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >          >      >                  >
> >          >     .triggering(AfterWatermark.pastEndOfWindow()
> >          >      >                  >
> >          >      >
> >          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >          >      >                  >
> >          >      >
> >          >
>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >          >      >                  >
> >          >      >
> >          >
>  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >          >      >                  >              .discardingFiredPanes()
> >          >      >                  >
> >          >      >
> >           .withAllowedLateness(Duration.standardDays(7)))
> >          >      >                  >
> >          >      >                  >          .apply(FileIO.write()
> >          >      >                  >              .via(TextIO.sink())
> >          >      >                  >              .withNaming(new
> >          >      >                 SafeFileNaming(outputPath, ".txt"))
> >          >      >                  >
> >         .withTempDirectory(tempLocation)
> >          >      >                  >              .withNumShards(5))
> >          >      >                  >
> >          >      >                  >
> >          >      >                  > If I run this on Beam 2.6.0 with
> >         Flink 1.5.0 on 5
> >          >      >                 workers (equal to
> >          >      >                  > number of shards), I would expect
> >         that each worker
> >          >      >                 will participate on
> >          >      >                  > persisting shards and equally,
> >         since code uses
> >          >     fixed
> >          >      >                 number of shards
> >          >      >                  > (and random shard assign?). But
> >         reality is
> >          >     different
> >          >      >                 (see 2 attachements
> >          >      >                  > - statistiscs from flink task
> >         reading from
> >          >     kafka and
> >          >      >                 task writing to files)
> >          >      >                  >
> >          >      >                  > What am I missing? How to achieve
> >         balanced writes?
> >          >      >                  >
> >          >      >                  > Thanks,
> >          >      >                  > Jozef
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
Thanks for the JIRA. If I understand it correctly ... so runner determined
sharding will avoid extra shuffle? Will it just write worker local
available data to it's shard? Something similar to coalesce in Spark?

On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <mx...@apache.org> wrote:

> Oh ok, thanks for the pointer. Coming from Flink, the default is that
> the sharding is determined by the runtime distribution. Indeed, we will
> have to add an overwrite to the Flink Runner, similar to this one:
>
>
> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>
> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>
> Thanks,
> Max
>
> On 25.10.18 22:37, Reuven Lax wrote:
> > FYI the Dataflow runner automatically sets the default number of shards
> > (I believe to be 2 * num_workers). Probably we should do something
> > similar for the Flink runner.
> >
> > This needs to be done by the runner, as # of workers is a runner
> > concept; the SDK itself has no concept of workers.
> >
> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     If I do not specify shards for unbounded collection, I get
> >
> >     Caused by: java.lang.IllegalArgumentException: When applying
> >     WriteFiles to an unbounded PCollection, must specify number of
> >     output shards explicitly
> >              at
> >     org.apache.beam.repackaged.beam_sdks_java_core.com
> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> >              at
> >     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
> >
> >     Around same lines in WriteFiles is also a check for windowed writes.
> >     I believe FileIO enables it explicitly when windowing is present. In
> >     filesystem written files are per window and shard.
> >
> >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         I agree it would be nice to keep the current distribution of
> >         elements
> >         instead of doing a shuffle based on an artificial shard key.
> >
> >         Have you tried `withWindowedWrites()`? Also, why do you say you
> >         need to
> >         specify the number of shards in streaming mode?
> >
> >         -Max
> >
> >         On 25.10.18 10:12, Jozef Vilcek wrote:
> >          > Hm, yes, this makes sense now, but what can be done for my
> >         case? I do
> >          > not want to end up with too many files on disk.
> >          >
> >          > I think what I am looking for is to instruct IO that do not
> >         do again
> >          > random shard and reshuffle but just assume number of shards
> >         equal to
> >          > number of workers and shard ID is a worker ID.
> >          > Is this doable in beam model?
> >          >
> >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>
> >          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >          >
> >          >     The FlinkRunner uses a hash function (MurmurHash) on each
> >         key which
> >          >     places keys somewhere in the hash space. The hash space
> >         (2^32) is split
> >          >     among the partitions (5 in your case). Given enough keys,
> >         the chance
> >          >     increases they are equally spread.
> >          >
> >          >     This should be similar to what the other Runners do.
> >          >
> >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >          >      >
> >          >      > So if I run 5 workers with 50 shards, I end up with:
> >          >      >
> >          >      > DurationBytes receivedRecords received
> >          >      >   2m 39s        900 MB            465,525
> >          >      >   2m 39s       1.76 GB            930,720
> >          >      >   2m 39s        789 MB            407,315
> >          >      >   2m 39s       1.32 GB            698,262
> >          >      >   2m 39s        788 MB            407,310
> >          >      >
> >          >      > Still not good but better than with 5 shards where
> >         some workers
> >          >     did not
> >          >      > participate at all.
> >          >      > So, problem is in some layer which distributes keys /
> >         shards
> >          >     among workers?
> >          >      >
> >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
> >         <relax@google.com <ma...@google.com>
> >          >     <mailto:relax@google.com <ma...@google.com>>
> >          >      > <mailto:relax@google.com <ma...@google.com>
> >         <mailto:relax@google.com <ma...@google.com>>>> wrote:
> >          >      >
> >          >      >     withNumShards(5) generates 5 random shards. It
> >         turns out that
> >          >      >     statistically when you generate 5 random shards
> >         and you have 5
> >          >      >     works, the probability is reasonably high that
> >         some workers
> >          >     will get
> >          >      >     more than one shard (and as a result not all
> >         workers will
> >          >      >     participate). Are you able to set the number of
> >         shards larger
> >          >     than 5?
> >          >      >
> >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> >          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
> >         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
> >          >      >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>
> >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>> wrote:
> >          >      >
> >          >      >         cc (dev)
> >          >      >
> >          >      >         I tried to run the example with FlinkRunner in
> >         batch mode and
> >          >      >         received again bad data spread among the
> workers.
> >          >      >
> >          >      >         When I tried to remove number of shards for
> >         batch mode in
> >          >     above
> >          >      >         example, pipeline crashed before launch
> >          >      >
> >          >      >         Caused by: java.lang.IllegalStateException:
> >         Inputs to Flatten
> >          >      >         had incompatible triggers:
> >          >      >
> >          >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >          >      >         entCountAtLeast(10000)),
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >          >      >         hour)))),
> >          >      >
> >          >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >          >      >         rever(AfterPane.elementCountAtLeast(1)),
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >          >      >
> >          >      >
> >          >      >
> >          >      >
> >          >      >
> >          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >          >      >         <jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>
> >          >     <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
> >         <ma...@gmail.com>>>> wrote:
> >          >      >
> >          >      >             Hi Max,
> >          >      >
> >          >      >             I forgot to mention that example is run in
> >         streaming
> >          >     mode,
> >          >      >             therefore I can not do writes without
> >         specifying shards.
> >          >      >             FileIO explicitly asks for them.
> >          >      >
> >          >      >             I am not sure where the problem is.
> >         FlinkRunner is
> >          >     only one
> >          >      >             I used.
> >          >      >
> >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
> >         Maximilian Michels
> >          >      >             <mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>
> >          >     <mailto:mxm@apache.org <ma...@apache.org>
> >         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
> >          >      >
> >          >      >                 Hi Jozef,
> >          >      >
> >          >      >                 This does not look like a FlinkRunner
> >         related
> >          >     problem,
> >          >      >                 but is caused by
> >          >      >                 the `WriteFiles` sharding logic. It
> >         assigns keys and
> >          >      >                 does a Reshuffle
> >          >      >                 which apparently does not lead to good
> >         data spread in
> >          >      >                 your case.
> >          >      >
> >          >      >                 Do you see the same behavior without
> >          >     `withNumShards(5)`?
> >          >      >
> >          >      >                 Thanks,
> >          >      >                 Max
> >          >      >
> >          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >          >      >                  > Hello,
> >          >      >                  >
> >          >      >                  > I am having some trouble to get a
> >         balanced
> >          >     write via
> >          >      >                 FileIO. Workers at
> >          >      >                  > the shuffle side where data per
> >         window fire are
> >          >      >                 written to the
> >          >      >                  > filesystem receive unbalanced
> >         number of events.
> >          >      >                  >
> >          >      >                  > Here is a naive code example:
> >          >      >                  >
> >          >      >                  >      val read = KafkaIO.read()
> >          >      >                  >          .withTopic("topic")
> >          >      >                  >
> >         .withBootstrapServers("kafka1:9092")
> >          >      >                  >
> >          >      >
> >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >          >      >                  >
> >          >      >
> >          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
> >          >      >                  >          .withProcessingTime()
> >          >      >                  >
> >          >      >                  >      pipeline
> >          >      >                  >          .apply(read)
> >          >      >                  >          .apply(MapElements.via(new
> >          >      >                  >
> SimpleFunction[KafkaRecord[Array[Byte],
> >          >     Array[Byte]],
> >          >      >                 String]() {
> >          >      >                  >            override def apply(input:
> >          >      >                 KafkaRecord[Array[Byte],
> >          >      >                  > Array[Byte]]): String = {
> >          >      >                  >              new
> >         String(input.getKV.getValue,
> >          >     "UTF-8")
> >          >      >                  >            }
> >          >      >                  >          }))
> >          >      >                  >
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >          >      >                  >
> >          >     .triggering(AfterWatermark.pastEndOfWindow()
> >          >      >                  >
> >          >      >
> >          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >          >      >                  >
> >          >      >
> >          >
>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >          >      >                  >
> >          >      >
> >          >
>  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >          >      >                  >              .discardingFiredPanes()
> >          >      >                  >
> >          >      >
> >           .withAllowedLateness(Duration.standardDays(7)))
> >          >      >                  >
> >          >      >                  >          .apply(FileIO.write()
> >          >      >                  >              .via(TextIO.sink())
> >          >      >                  >              .withNaming(new
> >          >      >                 SafeFileNaming(outputPath, ".txt"))
> >          >      >                  >
> >         .withTempDirectory(tempLocation)
> >          >      >                  >              .withNumShards(5))
> >          >      >                  >
> >          >      >                  >
> >          >      >                  > If I run this on Beam 2.6.0 with
> >         Flink 1.5.0 on 5
> >          >      >                 workers (equal to
> >          >      >                  > number of shards), I would expect
> >         that each worker
> >          >      >                 will participate on
> >          >      >                  > persisting shards and equally,
> >         since code uses
> >          >     fixed
> >          >      >                 number of shards
> >          >      >                  > (and random shard assign?). But
> >         reality is
> >          >     different
> >          >      >                 (see 2 attachements
> >          >      >                  > - statistiscs from flink task
> >         reading from
> >          >     kafka and
> >          >      >                 task writing to files)
> >          >      >                  >
> >          >      >                  > What am I missing? How to achieve
> >         balanced writes?
> >          >      >                  >
> >          >      >                  > Thanks,
> >          >      >                  > Jozef
> >          >      >                  >
> >          >      >                  >
> >          >      >
> >          >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Oh ok, thanks for the pointer. Coming from Flink, the default is that 
the sharding is determined by the runtime distribution. Indeed, we will 
have to add an overwrite to the Flink Runner, similar to this one:

https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347

Jira issue: https://issues.apache.org/jira/browse/BEAM-5865

Thanks,
Max

On 25.10.18 22:37, Reuven Lax wrote:
> FYI the Dataflow runner automatically sets the default number of shards 
> (I believe to be 2 * num_workers). Probably we should do something 
> similar for the Flink runner.
> 
> This needs to be done by the runner, as # of workers is a runner 
> concept; the SDK itself has no concept of workers.
> 
> On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     If I do not specify shards for unbounded collection, I get
> 
>     Caused by: java.lang.IllegalArgumentException: When applying
>     WriteFiles to an unbounded PCollection, must specify number of
>     output shards explicitly
>              at
>     org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>              at
>     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
> 
>     Around same lines in WriteFiles is also a check for windowed writes.
>     I believe FileIO enables it explicitly when windowing is present. In
>     filesystem written files are per window and shard.
> 
>     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         I agree it would be nice to keep the current distribution of
>         elements
>         instead of doing a shuffle based on an artificial shard key.
> 
>         Have you tried `withWindowedWrites()`? Also, why do you say you
>         need to
>         specify the number of shards in streaming mode?
> 
>         -Max
> 
>         On 25.10.18 10:12, Jozef Vilcek wrote:
>          > Hm, yes, this makes sense now, but what can be done for my
>         case? I do
>          > not want to end up with too many files on disk.
>          >
>          > I think what I am looking for is to instruct IO that do not
>         do again
>          > random shard and reshuffle but just assume number of shards
>         equal to
>          > number of workers and shard ID is a worker ID.
>          > Is this doable in beam model?
>          >
>          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >     The FlinkRunner uses a hash function (MurmurHash) on each
>         key which
>          >     places keys somewhere in the hash space. The hash space
>         (2^32) is split
>          >     among the partitions (5 in your case). Given enough keys,
>         the chance
>          >     increases they are equally spread.
>          >
>          >     This should be similar to what the other Runners do.
>          >
>          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>          >      >
>          >      > So if I run 5 workers with 50 shards, I end up with:
>          >      >
>          >      > DurationBytes receivedRecords received
>          >      >   2m 39s        900 MB            465,525
>          >      >   2m 39s       1.76 GB            930,720
>          >      >   2m 39s        789 MB            407,315
>          >      >   2m 39s       1.32 GB            698,262
>          >      >   2m 39s        788 MB            407,310
>          >      >
>          >      > Still not good but better than with 5 shards where
>         some workers
>          >     did not
>          >      > participate at all.
>          >      > So, problem is in some layer which distributes keys /
>         shards
>          >     among workers?
>          >      >
>          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>         <relax@google.com <ma...@google.com>
>          >     <mailto:relax@google.com <ma...@google.com>>
>          >      > <mailto:relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>>> wrote:
>          >      >
>          >      >     withNumShards(5) generates 5 random shards. It
>         turns out that
>          >      >     statistically when you generate 5 random shards
>         and you have 5
>          >      >     works, the probability is reasonably high that
>         some workers
>          >     will get
>          >      >     more than one shard (and as a result not all
>         workers will
>          >      >     participate). Are you able to set the number of
>         shards larger
>          >     than 5?
>          >      >
>          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
>         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
>          >      >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>> wrote:
>          >      >
>          >      >         cc (dev)
>          >      >
>          >      >         I tried to run the example with FlinkRunner in
>         batch mode and
>          >      >         received again bad data spread among the workers.
>          >      >
>          >      >         When I tried to remove number of shards for
>         batch mode in
>          >     above
>          >      >         example, pipeline crashed before launch
>          >      >
>          >      >         Caused by: java.lang.IllegalStateException:
>         Inputs to Flatten
>          >      >         had incompatible triggers:
>          >      >
>          >     
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>          >      >         entCountAtLeast(10000)),
>          >      >
>          >     
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>          >      >         hour)))),
>          >      >
>          >     
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>          >      >         rever(AfterPane.elementCountAtLeast(1)),
>          >      >
>          >     
>           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>          >      >
>          >      >
>          >      >
>          >      >
>          >      >
>          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>          >      >         <jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>> wrote:
>          >      >
>          >      >             Hi Max,
>          >      >
>          >      >             I forgot to mention that example is run in
>         streaming
>          >     mode,
>          >      >             therefore I can not do writes without
>         specifying shards.
>          >      >             FileIO explicitly asks for them.
>          >      >
>          >      >             I am not sure where the problem is.
>         FlinkRunner is
>          >     only one
>          >      >             I used.
>          >      >
>          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>         Maximilian Michels
>          >      >             <mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >     <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>          >      >
>          >      >                 Hi Jozef,
>          >      >
>          >      >                 This does not look like a FlinkRunner
>         related
>          >     problem,
>          >      >                 but is caused by
>          >      >                 the `WriteFiles` sharding logic. It
>         assigns keys and
>          >      >                 does a Reshuffle
>          >      >                 which apparently does not lead to good
>         data spread in
>          >      >                 your case.
>          >      >
>          >      >                 Do you see the same behavior without
>          >     `withNumShards(5)`?
>          >      >
>          >      >                 Thanks,
>          >      >                 Max
>          >      >
>          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>          >      >                  > Hello,
>          >      >                  >
>          >      >                  > I am having some trouble to get a
>         balanced
>          >     write via
>          >      >                 FileIO. Workers at
>          >      >                  > the shuffle side where data per
>         window fire are
>          >      >                 written to the
>          >      >                  > filesystem receive unbalanced
>         number of events.
>          >      >                  >
>          >      >                  > Here is a naive code example:
>          >      >                  >
>          >      >                  >      val read = KafkaIO.read()
>          >      >                  >          .withTopic("topic")
>          >      >                  >         
>         .withBootstrapServers("kafka1:9092")
>          >      >                  >
>          >      >               
>           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>          >      >                  >
>          >      >
>          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>          >      >                  >          .withProcessingTime()
>          >      >                  >
>          >      >                  >      pipeline
>          >      >                  >          .apply(read)
>          >      >                  >          .apply(MapElements.via(new
>          >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
>          >     Array[Byte]],
>          >      >                 String]() {
>          >      >                  >            override def apply(input:
>          >      >                 KafkaRecord[Array[Byte],
>          >      >                  > Array[Byte]]): String = {
>          >      >                  >              new
>         String(input.getKV.getValue,
>          >     "UTF-8")
>          >      >                  >            }
>          >      >                  >          }))
>          >      >                  >
>          >      >                  >
>          >      >                  >
>          >      >
>          >     
>           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>          >      >                  >
>          >     .triggering(AfterWatermark.pastEndOfWindow()
>          >      >                  >
>          >      >
>          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>          >      >                  >
>          >      >
>          >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>          >      >                  >
>          >      >
>          >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>          >      >                  >
>          >      >                  >
>          >      >
>          >     
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>          >      >                  >              .discardingFiredPanes()
>          >      >                  >
>          >      >               
>           .withAllowedLateness(Duration.standardDays(7)))
>          >      >                  >
>          >      >                  >          .apply(FileIO.write()
>          >      >                  >              .via(TextIO.sink())
>          >      >                  >              .withNaming(new
>          >      >                 SafeFileNaming(outputPath, ".txt"))
>          >      >                  >             
>         .withTempDirectory(tempLocation)
>          >      >                  >              .withNumShards(5))
>          >      >                  >
>          >      >                  >
>          >      >                  > If I run this on Beam 2.6.0 with
>         Flink 1.5.0 on 5
>          >      >                 workers (equal to
>          >      >                  > number of shards), I would expect
>         that each worker
>          >      >                 will participate on
>          >      >                  > persisting shards and equally,
>         since code uses
>          >     fixed
>          >      >                 number of shards
>          >      >                  > (and random shard assign?). But
>         reality is
>          >     different
>          >      >                 (see 2 attachements
>          >      >                  > - statistiscs from flink task
>         reading from
>          >     kafka and
>          >      >                 task writing to files)
>          >      >                  >
>          >      >                  > What am I missing? How to achieve
>         balanced writes?
>          >      >                  >
>          >      >                  > Thanks,
>          >      >                  > Jozef
>          >      >                  >
>          >      >                  >
>          >      >
>          >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Oh ok, thanks for the pointer. Coming from Flink, the default is that 
the sharding is determined by the runtime distribution. Indeed, we will 
have to add an overwrite to the Flink Runner, similar to this one:

https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347

Jira issue: https://issues.apache.org/jira/browse/BEAM-5865

Thanks,
Max

On 25.10.18 22:37, Reuven Lax wrote:
> FYI the Dataflow runner automatically sets the default number of shards 
> (I believe to be 2 * num_workers). Probably we should do something 
> similar for the Flink runner.
> 
> This needs to be done by the runner, as # of workers is a runner 
> concept; the SDK itself has no concept of workers.
> 
> On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vilcek@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     If I do not specify shards for unbounded collection, I get
> 
>     Caused by: java.lang.IllegalArgumentException: When applying
>     WriteFiles to an unbounded PCollection, must specify number of
>     output shards explicitly
>              at
>     org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>              at
>     org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
> 
>     Around same lines in WriteFiles is also a check for windowed writes.
>     I believe FileIO enables it explicitly when windowing is present. In
>     filesystem written files are per window and shard.
> 
>     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         I agree it would be nice to keep the current distribution of
>         elements
>         instead of doing a shuffle based on an artificial shard key.
> 
>         Have you tried `withWindowedWrites()`? Also, why do you say you
>         need to
>         specify the number of shards in streaming mode?
> 
>         -Max
> 
>         On 25.10.18 10:12, Jozef Vilcek wrote:
>          > Hm, yes, this makes sense now, but what can be done for my
>         case? I do
>          > not want to end up with too many files on disk.
>          >
>          > I think what I am looking for is to instruct IO that do not
>         do again
>          > random shard and reshuffle but just assume number of shards
>         equal to
>          > number of workers and shard ID is a worker ID.
>          > Is this doable in beam model?
>          >
>          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>
>          > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>          >
>          >     The FlinkRunner uses a hash function (MurmurHash) on each
>         key which
>          >     places keys somewhere in the hash space. The hash space
>         (2^32) is split
>          >     among the partitions (5 in your case). Given enough keys,
>         the chance
>          >     increases they are equally spread.
>          >
>          >     This should be similar to what the other Runners do.
>          >
>          >     On 24.10.18 10:58, Jozef Vilcek wrote:
>          >      >
>          >      > So if I run 5 workers with 50 shards, I end up with:
>          >      >
>          >      > DurationBytes receivedRecords received
>          >      >   2m 39s        900 MB            465,525
>          >      >   2m 39s       1.76 GB            930,720
>          >      >   2m 39s        789 MB            407,315
>          >      >   2m 39s       1.32 GB            698,262
>          >      >   2m 39s        788 MB            407,310
>          >      >
>          >      > Still not good but better than with 5 shards where
>         some workers
>          >     did not
>          >      > participate at all.
>          >      > So, problem is in some layer which distributes keys /
>         shards
>          >     among workers?
>          >      >
>          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
>         <relax@google.com <ma...@google.com>
>          >     <mailto:relax@google.com <ma...@google.com>>
>          >      > <mailto:relax@google.com <ma...@google.com>
>         <mailto:relax@google.com <ma...@google.com>>>> wrote:
>          >      >
>          >      >     withNumShards(5) generates 5 random shards. It
>         turns out that
>          >      >     statistically when you generate 5 random shards
>         and you have 5
>          >      >     works, the probability is reasonably high that
>         some workers
>          >     will get
>          >      >     more than one shard (and as a result not all
>         workers will
>          >      >     participate). Are you able to set the number of
>         shards larger
>          >     than 5?
>          >      >
>          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>          >     <jozo.vilcek@gmail.com <ma...@gmail.com>
>         <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>
>          >      >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>
>          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>> wrote:
>          >      >
>          >      >         cc (dev)
>          >      >
>          >      >         I tried to run the example with FlinkRunner in
>         batch mode and
>          >      >         received again bad data spread among the workers.
>          >      >
>          >      >         When I tried to remove number of shards for
>         batch mode in
>          >     above
>          >      >         example, pipeline crashed before launch
>          >      >
>          >      >         Caused by: java.lang.IllegalStateException:
>         Inputs to Flatten
>          >      >         had incompatible triggers:
>          >      >
>          >     
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>          >      >         entCountAtLeast(10000)),
>          >      >
>          >     
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>          >      >         hour)))),
>          >      >
>          >     
>           AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>          >      >         rever(AfterPane.elementCountAtLeast(1)),
>          >      >
>          >     
>           Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>          >      >
>          >      >
>          >      >
>          >      >
>          >      >
>          >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>          >      >         <jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>
>          >     <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com> <mailto:jozo.vilcek@gmail.com
>         <ma...@gmail.com>>>> wrote:
>          >      >
>          >      >             Hi Max,
>          >      >
>          >      >             I forgot to mention that example is run in
>         streaming
>          >     mode,
>          >      >             therefore I can not do writes without
>         specifying shards.
>          >      >             FileIO explicitly asks for them.
>          >      >
>          >      >             I am not sure where the problem is.
>         FlinkRunner is
>          >     only one
>          >      >             I used.
>          >      >
>          >      >             On Tue, Oct 23, 2018 at 11:27 AM
>         Maximilian Michels
>          >      >             <mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>
>          >     <mailto:mxm@apache.org <ma...@apache.org>
>         <mailto:mxm@apache.org <ma...@apache.org>>>> wrote:
>          >      >
>          >      >                 Hi Jozef,
>          >      >
>          >      >                 This does not look like a FlinkRunner
>         related
>          >     problem,
>          >      >                 but is caused by
>          >      >                 the `WriteFiles` sharding logic. It
>         assigns keys and
>          >      >                 does a Reshuffle
>          >      >                 which apparently does not lead to good
>         data spread in
>          >      >                 your case.
>          >      >
>          >      >                 Do you see the same behavior without
>          >     `withNumShards(5)`?
>          >      >
>          >      >                 Thanks,
>          >      >                 Max
>          >      >
>          >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>          >      >                  > Hello,
>          >      >                  >
>          >      >                  > I am having some trouble to get a
>         balanced
>          >     write via
>          >      >                 FileIO. Workers at
>          >      >                  > the shuffle side where data per
>         window fire are
>          >      >                 written to the
>          >      >                  > filesystem receive unbalanced
>         number of events.
>          >      >                  >
>          >      >                  > Here is a naive code example:
>          >      >                  >
>          >      >                  >      val read = KafkaIO.read()
>          >      >                  >          .withTopic("topic")
>          >      >                  >         
>         .withBootstrapServers("kafka1:9092")
>          >      >                  >
>          >      >               
>           .withKeyDeserializer(classOf[ByteArrayDeserializer])
>          >      >                  >
>          >      >
>          >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>          >      >                  >          .withProcessingTime()
>          >      >                  >
>          >      >                  >      pipeline
>          >      >                  >          .apply(read)
>          >      >                  >          .apply(MapElements.via(new
>          >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
>          >     Array[Byte]],
>          >      >                 String]() {
>          >      >                  >            override def apply(input:
>          >      >                 KafkaRecord[Array[Byte],
>          >      >                  > Array[Byte]]): String = {
>          >      >                  >              new
>         String(input.getKV.getValue,
>          >     "UTF-8")
>          >      >                  >            }
>          >      >                  >          }))
>          >      >                  >
>          >      >                  >
>          >      >                  >
>          >      >
>          >     
>           .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>          >      >                  >
>          >     .triggering(AfterWatermark.pastEndOfWindow()
>          >      >                  >
>          >      >
>          >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>          >      >                  >
>          >      >
>          >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>          >      >                  >
>          >      >
>          >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>          >      >                  >
>          >      >                  >
>          >      >
>          >     
>           Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>          >      >                  >              .discardingFiredPanes()
>          >      >                  >
>          >      >               
>           .withAllowedLateness(Duration.standardDays(7)))
>          >      >                  >
>          >      >                  >          .apply(FileIO.write()
>          >      >                  >              .via(TextIO.sink())
>          >      >                  >              .withNaming(new
>          >      >                 SafeFileNaming(outputPath, ".txt"))
>          >      >                  >             
>         .withTempDirectory(tempLocation)
>          >      >                  >              .withNumShards(5))
>          >      >                  >
>          >      >                  >
>          >      >                  > If I run this on Beam 2.6.0 with
>         Flink 1.5.0 on 5
>          >      >                 workers (equal to
>          >      >                  > number of shards), I would expect
>         that each worker
>          >      >                 will participate on
>          >      >                  > persisting shards and equally,
>         since code uses
>          >     fixed
>          >      >                 number of shards
>          >      >                  > (and random shard assign?). But
>         reality is
>          >     different
>          >      >                 (see 2 attachements
>          >      >                  > - statistiscs from flink task
>         reading from
>          >     kafka and
>          >      >                 task writing to files)
>          >      >                  >
>          >      >                  > What am I missing? How to achieve
>         balanced writes?
>          >      >                  >
>          >      >                  > Thanks,
>          >      >                  > Jozef
>          >      >                  >
>          >      >                  >
>          >      >
>          >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Reuven Lax <re...@google.com>.
FYI the Dataflow runner automatically sets the default number of shards (I
believe to be 2 * num_workers). Probably we should do something similar for
the Flink runner.

This needs to be done by the runner, as # of workers is a runner concept;
the SDK itself has no concept of workers.

On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jo...@gmail.com> wrote:

> If I do not specify shards for unbounded collection, I get
>
> Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to
> an unbounded PCollection, must specify number of output shards explicitly
>         at
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>         at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
>
> Around same lines in WriteFiles is also a check for windowed writes. I
> believe FileIO enables it explicitly when windowing is present. In
> filesystem written files are per window and shard.
>
> On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mx...@apache.org>
> wrote:
>
>> I agree it would be nice to keep the current distribution of elements
>> instead of doing a shuffle based on an artificial shard key.
>>
>> Have you tried `withWindowedWrites()`? Also, why do you say you need to
>> specify the number of shards in streaming mode?
>>
>> -Max
>>
>> On 25.10.18 10:12, Jozef Vilcek wrote:
>> > Hm, yes, this makes sense now, but what can be done for my case? I do
>> > not want to end up with too many files on disk.
>> >
>> > I think what I am looking for is to instruct IO that do not do again
>> > random shard and reshuffle but just assume number of shards equal to
>> > number of workers and shard ID is a worker ID.
>> > Is this doable in beam model?
>> >
>> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mxm@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     The FlinkRunner uses a hash function (MurmurHash) on each key which
>> >     places keys somewhere in the hash space. The hash space (2^32) is
>> split
>> >     among the partitions (5 in your case). Given enough keys, the chance
>> >     increases they are equally spread.
>> >
>> >     This should be similar to what the other Runners do.
>> >
>> >     On 24.10.18 10:58, Jozef Vilcek wrote:
>> >      >
>> >      > So if I run 5 workers with 50 shards, I end up with:
>> >      >
>> >      > DurationBytes receivedRecords received
>> >      >   2m 39s        900 MB            465,525
>> >      >   2m 39s       1.76 GB            930,720
>> >      >   2m 39s        789 MB            407,315
>> >      >   2m 39s       1.32 GB            698,262
>> >      >   2m 39s        788 MB            407,310
>> >      >
>> >      > Still not good but better than with 5 shards where some workers
>> >     did not
>> >      > participate at all.
>> >      > So, problem is in some layer which distributes keys / shards
>> >     among workers?
>> >      >
>> >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
>> >     <ma...@google.com>
>> >      > <mailto:relax@google.com <ma...@google.com>>> wrote:
>> >      >
>> >      >     withNumShards(5) generates 5 random shards. It turns out that
>> >      >     statistically when you generate 5 random shards and you have
>> 5
>> >      >     works, the probability is reasonably high that some workers
>> >     will get
>> >      >     more than one shard (and as a result not all workers will
>> >      >     participate). Are you able to set the number of shards larger
>> >     than 5?
>> >      >
>> >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>> >     <jozo.vilcek@gmail.com <ma...@gmail.com>
>> >      >     <mailto:jozo.vilcek@gmail.com
>> >     <ma...@gmail.com>>> wrote:
>> >      >
>> >      >         cc (dev)
>> >      >
>> >      >         I tried to run the example with FlinkRunner in batch
>> mode and
>> >      >         received again bad data spread among the workers.
>> >      >
>> >      >         When I tried to remove number of shards for batch mode in
>> >     above
>> >      >         example, pipeline crashed before launch
>> >      >
>> >      >         Caused by: java.lang.IllegalStateException: Inputs to
>> Flatten
>> >      >         had incompatible triggers:
>> >      >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> >      >         entCountAtLeast(10000)),
>> >      >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> >      >         hour)))),
>> >      >
>> >
>>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> >      >         rever(AfterPane.elementCountAtLeast(1)),
>> >      >
>> >
>>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>> >      >
>> >      >
>> >      >
>> >      >
>> >      >
>> >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>> >      >         <jozo.vilcek@gmail.com <ma...@gmail.com>
>> >     <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>>
>> wrote:
>> >      >
>> >      >             Hi Max,
>> >      >
>> >      >             I forgot to mention that example is run in streaming
>> >     mode,
>> >      >             therefore I can not do writes without specifying
>> shards.
>> >      >             FileIO explicitly asks for them.
>> >      >
>> >      >             I am not sure where the problem is. FlinkRunner is
>> >     only one
>> >      >             I used.
>> >      >
>> >      >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
>> >      >             <mxm@apache.org <ma...@apache.org>
>> >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>> >      >
>> >      >                 Hi Jozef,
>> >      >
>> >      >                 This does not look like a FlinkRunner related
>> >     problem,
>> >      >                 but is caused by
>> >      >                 the `WriteFiles` sharding logic. It assigns keys
>> and
>> >      >                 does a Reshuffle
>> >      >                 which apparently does not lead to good data
>> spread in
>> >      >                 your case.
>> >      >
>> >      >                 Do you see the same behavior without
>> >     `withNumShards(5)`?
>> >      >
>> >      >                 Thanks,
>> >      >                 Max
>> >      >
>> >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>> >      >                  > Hello,
>> >      >                  >
>> >      >                  > I am having some trouble to get a balanced
>> >     write via
>> >      >                 FileIO. Workers at
>> >      >                  > the shuffle side where data per window fire
>> are
>> >      >                 written to the
>> >      >                  > filesystem receive unbalanced number of
>> events.
>> >      >                  >
>> >      >                  > Here is a naive code example:
>> >      >                  >
>> >      >                  >      val read = KafkaIO.read()
>> >      >                  >          .withTopic("topic")
>> >      >                  >          .withBootstrapServers("kafka1:9092")
>> >      >                  >
>> >      >
>>  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >      >                  >
>> >      >
>> >       .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >      >                  >          .withProcessingTime()
>> >      >                  >
>> >      >                  >      pipeline
>> >      >                  >          .apply(read)
>> >      >                  >          .apply(MapElements.via(new
>> >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
>> >     Array[Byte]],
>> >      >                 String]() {
>> >      >                  >            override def apply(input:
>> >      >                 KafkaRecord[Array[Byte],
>> >      >                  > Array[Byte]]): String = {
>> >      >                  >              new String(input.getKV.getValue,
>> >     "UTF-8")
>> >      >                  >            }
>> >      >                  >          }))
>> >      >                  >
>> >      >                  >
>> >      >                  >
>> >      >
>> >
>>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >      >                  >
>> >     .triggering(AfterWatermark.pastEndOfWindow()
>> >      >                  >
>> >      >
>> >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >      >                  >
>> >      >
>> >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >      >                  >
>> >      >
>> >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >      >                  >
>> >      >                  >
>> >      >
>> >
>>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >      >                  >              .discardingFiredPanes()
>> >      >                  >
>> >      >                 .withAllowedLateness(Duration.standardDays(7)))
>> >      >                  >
>> >      >                  >          .apply(FileIO.write()
>> >      >                  >              .via(TextIO.sink())
>> >      >                  >              .withNaming(new
>> >      >                 SafeFileNaming(outputPath, ".txt"))
>> >      >                  >              .withTempDirectory(tempLocation)
>> >      >                  >              .withNumShards(5))
>> >      >                  >
>> >      >                  >
>> >      >                  > If I run this on Beam 2.6.0 with Flink 1.5.0
>> on 5
>> >      >                 workers (equal to
>> >      >                  > number of shards), I would expect that each
>> worker
>> >      >                 will participate on
>> >      >                  > persisting shards and equally, since code uses
>> >     fixed
>> >      >                 number of shards
>> >      >                  > (and random shard assign?). But reality is
>> >     different
>> >      >                 (see 2 attachements
>> >      >                  > - statistiscs from flink task reading from
>> >     kafka and
>> >      >                 task writing to files)
>> >      >                  >
>> >      >                  > What am I missing? How to achieve balanced
>> writes?
>> >      >                  >
>> >      >                  > Thanks,
>> >      >                  > Jozef
>> >      >                  >
>> >      >                  >
>> >      >
>> >
>>
>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
If I do not specify shards for unbounded collection, I get

Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to
an unbounded PCollection, must specify number of output shards explicitly
        at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
        at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)

Around same lines in WriteFiles is also a check for windowed writes. I
believe FileIO enables it explicitly when windowing is present. In
filesystem written files are per window and shard.

On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mx...@apache.org> wrote:

> I agree it would be nice to keep the current distribution of elements
> instead of doing a shuffle based on an artificial shard key.
>
> Have you tried `withWindowedWrites()`? Also, why do you say you need to
> specify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be done for my case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking for is to instruct IO that do not do again
> > random shard and reshuffle but just assume number of shards equal to
> > number of workers and shard ID is a worker ID.
> > Is this doable in beam model?
> >
> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     The FlinkRunner uses a hash function (MurmurHash) on each key which
> >     places keys somewhere in the hash space. The hash space (2^32) is
> split
> >     among the partitions (5 in your case). Given enough keys, the chance
> >     increases they are equally spread.
> >
> >     This should be similar to what the other Runners do.
> >
> >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >      >
> >      > So if I run 5 workers with 50 shards, I end up with:
> >      >
> >      > DurationBytes receivedRecords received
> >      >   2m 39s        900 MB            465,525
> >      >   2m 39s       1.76 GB            930,720
> >      >   2m 39s        789 MB            407,315
> >      >   2m 39s       1.32 GB            698,262
> >      >   2m 39s        788 MB            407,310
> >      >
> >      > Still not good but better than with 5 shards where some workers
> >     did not
> >      > participate at all.
> >      > So, problem is in some layer which distributes keys / shards
> >     among workers?
> >      >
> >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
> >     <ma...@google.com>
> >      > <mailto:relax@google.com <ma...@google.com>>> wrote:
> >      >
> >      >     withNumShards(5) generates 5 random shards. It turns out that
> >      >     statistically when you generate 5 random shards and you have 5
> >      >     works, the probability is reasonably high that some workers
> >     will get
> >      >     more than one shard (and as a result not all workers will
> >      >     participate). Are you able to set the number of shards larger
> >     than 5?
> >      >
> >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>
> >      >     <mailto:jozo.vilcek@gmail.com
> >     <ma...@gmail.com>>> wrote:
> >      >
> >      >         cc (dev)
> >      >
> >      >         I tried to run the example with FlinkRunner in batch mode
> and
> >      >         received again bad data spread among the workers.
> >      >
> >      >         When I tried to remove number of shards for batch mode in
> >     above
> >      >         example, pipeline crashed before launch
> >      >
> >      >         Caused by: java.lang.IllegalStateException: Inputs to
> Flatten
> >      >         had incompatible triggers:
> >      >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >      >         entCountAtLeast(10000)),
> >      >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >      >         hour)))),
> >      >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >      >         rever(AfterPane.elementCountAtLeast(1)),
> >      >
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >      >
> >      >
> >      >
> >      >
> >      >
> >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >      >         <jozo.vilcek@gmail.com <ma...@gmail.com>
> >     <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>>
> wrote:
> >      >
> >      >             Hi Max,
> >      >
> >      >             I forgot to mention that example is run in streaming
> >     mode,
> >      >             therefore I can not do writes without specifying
> shards.
> >      >             FileIO explicitly asks for them.
> >      >
> >      >             I am not sure where the problem is. FlinkRunner is
> >     only one
> >      >             I used.
> >      >
> >      >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> >      >             <mxm@apache.org <ma...@apache.org>
> >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >      >
> >      >                 Hi Jozef,
> >      >
> >      >                 This does not look like a FlinkRunner related
> >     problem,
> >      >                 but is caused by
> >      >                 the `WriteFiles` sharding logic. It assigns keys
> and
> >      >                 does a Reshuffle
> >      >                 which apparently does not lead to good data
> spread in
> >      >                 your case.
> >      >
> >      >                 Do you see the same behavior without
> >     `withNumShards(5)`?
> >      >
> >      >                 Thanks,
> >      >                 Max
> >      >
> >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >      >                  > Hello,
> >      >                  >
> >      >                  > I am having some trouble to get a balanced
> >     write via
> >      >                 FileIO. Workers at
> >      >                  > the shuffle side where data per window fire are
> >      >                 written to the
> >      >                  > filesystem receive unbalanced number of events.
> >      >                  >
> >      >                  > Here is a naive code example:
> >      >                  >
> >      >                  >      val read = KafkaIO.read()
> >      >                  >          .withTopic("topic")
> >      >                  >          .withBootstrapServers("kafka1:9092")
> >      >                  >
> >      >
>  .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >      >                  >
> >      >
> >       .withValueDeserializer(classOf[ByteArrayDeserializer])
> >      >                  >          .withProcessingTime()
> >      >                  >
> >      >                  >      pipeline
> >      >                  >          .apply(read)
> >      >                  >          .apply(MapElements.via(new
> >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
> >     Array[Byte]],
> >      >                 String]() {
> >      >                  >            override def apply(input:
> >      >                 KafkaRecord[Array[Byte],
> >      >                  > Array[Byte]]): String = {
> >      >                  >              new String(input.getKV.getValue,
> >     "UTF-8")
> >      >                  >            }
> >      >                  >          }))
> >      >                  >
> >      >                  >
> >      >                  >
> >      >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >      >                  >
> >     .triggering(AfterWatermark.pastEndOfWindow()
> >      >                  >
> >      >
> >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >      >                  >
> >      >
> >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >      >                  >
> >      >
> >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >      >                  >
> >      >                  >
> >      >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >      >                  >              .discardingFiredPanes()
> >      >                  >
> >      >                 .withAllowedLateness(Duration.standardDays(7)))
> >      >                  >
> >      >                  >          .apply(FileIO.write()
> >      >                  >              .via(TextIO.sink())
> >      >                  >              .withNaming(new
> >      >                 SafeFileNaming(outputPath, ".txt"))
> >      >                  >              .withTempDirectory(tempLocation)
> >      >                  >              .withNumShards(5))
> >      >                  >
> >      >                  >
> >      >                  > If I run this on Beam 2.6.0 with Flink 1.5.0
> on 5
> >      >                 workers (equal to
> >      >                  > number of shards), I would expect that each
> worker
> >      >                 will participate on
> >      >                  > persisting shards and equally, since code uses
> >     fixed
> >      >                 number of shards
> >      >                  > (and random shard assign?). But reality is
> >     different
> >      >                 (see 2 attachements
> >      >                  > - statistiscs from flink task reading from
> >     kafka and
> >      >                 task writing to files)
> >      >                  >
> >      >                  > What am I missing? How to achieve balanced
> writes?
> >      >                  >
> >      >                  > Thanks,
> >      >                  > Jozef
> >      >                  >
> >      >                  >
> >      >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
If I do not specify shards for unbounded collection, I get

Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to
an unbounded PCollection, must specify number of output shards explicitly
        at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
        at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)

Around same lines in WriteFiles is also a check for windowed writes. I
believe FileIO enables it explicitly when windowing is present. In
filesystem written files are per window and shard.

On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <mx...@apache.org> wrote:

> I agree it would be nice to keep the current distribution of elements
> instead of doing a shuffle based on an artificial shard key.
>
> Have you tried `withWindowedWrites()`? Also, why do you say you need to
> specify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be done for my case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking for is to instruct IO that do not do again
> > random shard and reshuffle but just assume number of shards equal to
> > number of workers and shard ID is a worker ID.
> > Is this doable in beam model?
> >
> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     The FlinkRunner uses a hash function (MurmurHash) on each key which
> >     places keys somewhere in the hash space. The hash space (2^32) is
> split
> >     among the partitions (5 in your case). Given enough keys, the chance
> >     increases they are equally spread.
> >
> >     This should be similar to what the other Runners do.
> >
> >     On 24.10.18 10:58, Jozef Vilcek wrote:
> >      >
> >      > So if I run 5 workers with 50 shards, I end up with:
> >      >
> >      > DurationBytes receivedRecords received
> >      >   2m 39s        900 MB            465,525
> >      >   2m 39s       1.76 GB            930,720
> >      >   2m 39s        789 MB            407,315
> >      >   2m 39s       1.32 GB            698,262
> >      >   2m 39s        788 MB            407,310
> >      >
> >      > Still not good but better than with 5 shards where some workers
> >     did not
> >      > participate at all.
> >      > So, problem is in some layer which distributes keys / shards
> >     among workers?
> >      >
> >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
> >     <ma...@google.com>
> >      > <mailto:relax@google.com <ma...@google.com>>> wrote:
> >      >
> >      >     withNumShards(5) generates 5 random shards. It turns out that
> >      >     statistically when you generate 5 random shards and you have 5
> >      >     works, the probability is reasonably high that some workers
> >     will get
> >      >     more than one shard (and as a result not all workers will
> >      >     participate). Are you able to set the number of shards larger
> >     than 5?
> >      >
> >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>
> >      >     <mailto:jozo.vilcek@gmail.com
> >     <ma...@gmail.com>>> wrote:
> >      >
> >      >         cc (dev)
> >      >
> >      >         I tried to run the example with FlinkRunner in batch mode
> and
> >      >         received again bad data spread among the workers.
> >      >
> >      >         When I tried to remove number of shards for batch mode in
> >     above
> >      >         example, pipeline crashed before launch
> >      >
> >      >         Caused by: java.lang.IllegalStateException: Inputs to
> Flatten
> >      >         had incompatible triggers:
> >      >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >      >         entCountAtLeast(10000)),
> >      >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >      >         hour)))),
> >      >
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >      >         rever(AfterPane.elementCountAtLeast(1)),
> >      >
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >      >
> >      >
> >      >
> >      >
> >      >
> >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >      >         <jozo.vilcek@gmail.com <ma...@gmail.com>
> >     <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>>
> wrote:
> >      >
> >      >             Hi Max,
> >      >
> >      >             I forgot to mention that example is run in streaming
> >     mode,
> >      >             therefore I can not do writes without specifying
> shards.
> >      >             FileIO explicitly asks for them.
> >      >
> >      >             I am not sure where the problem is. FlinkRunner is
> >     only one
> >      >             I used.
> >      >
> >      >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> >      >             <mxm@apache.org <ma...@apache.org>
> >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >      >
> >      >                 Hi Jozef,
> >      >
> >      >                 This does not look like a FlinkRunner related
> >     problem,
> >      >                 but is caused by
> >      >                 the `WriteFiles` sharding logic. It assigns keys
> and
> >      >                 does a Reshuffle
> >      >                 which apparently does not lead to good data
> spread in
> >      >                 your case.
> >      >
> >      >                 Do you see the same behavior without
> >     `withNumShards(5)`?
> >      >
> >      >                 Thanks,
> >      >                 Max
> >      >
> >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >      >                  > Hello,
> >      >                  >
> >      >                  > I am having some trouble to get a balanced
> >     write via
> >      >                 FileIO. Workers at
> >      >                  > the shuffle side where data per window fire are
> >      >                 written to the
> >      >                  > filesystem receive unbalanced number of events.
> >      >                  >
> >      >                  > Here is a naive code example:
> >      >                  >
> >      >                  >      val read = KafkaIO.read()
> >      >                  >          .withTopic("topic")
> >      >                  >          .withBootstrapServers("kafka1:9092")
> >      >                  >
> >      >
>  .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >      >                  >
> >      >
> >       .withValueDeserializer(classOf[ByteArrayDeserializer])
> >      >                  >          .withProcessingTime()
> >      >                  >
> >      >                  >      pipeline
> >      >                  >          .apply(read)
> >      >                  >          .apply(MapElements.via(new
> >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
> >     Array[Byte]],
> >      >                 String]() {
> >      >                  >            override def apply(input:
> >      >                 KafkaRecord[Array[Byte],
> >      >                  > Array[Byte]]): String = {
> >      >                  >              new String(input.getKV.getValue,
> >     "UTF-8")
> >      >                  >            }
> >      >                  >          }))
> >      >                  >
> >      >                  >
> >      >                  >
> >      >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >      >                  >
> >     .triggering(AfterWatermark.pastEndOfWindow()
> >      >                  >
> >      >
> >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >      >                  >
> >      >
> >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >      >                  >
> >      >
> >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >      >                  >
> >      >                  >
> >      >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >      >                  >              .discardingFiredPanes()
> >      >                  >
> >      >                 .withAllowedLateness(Duration.standardDays(7)))
> >      >                  >
> >      >                  >          .apply(FileIO.write()
> >      >                  >              .via(TextIO.sink())
> >      >                  >              .withNaming(new
> >      >                 SafeFileNaming(outputPath, ".txt"))
> >      >                  >              .withTempDirectory(tempLocation)
> >      >                  >              .withNumShards(5))
> >      >                  >
> >      >                  >
> >      >                  > If I run this on Beam 2.6.0 with Flink 1.5.0
> on 5
> >      >                 workers (equal to
> >      >                  > number of shards), I would expect that each
> worker
> >      >                 will participate on
> >      >                  > persisting shards and equally, since code uses
> >     fixed
> >      >                 number of shards
> >      >                  > (and random shard assign?). But reality is
> >     different
> >      >                 (see 2 attachements
> >      >                  > - statistiscs from flink task reading from
> >     kafka and
> >      >                 task writing to files)
> >      >                  >
> >      >                  > What am I missing? How to achieve balanced
> writes?
> >      >                  >
> >      >                  > Thanks,
> >      >                  > Jozef
> >      >                  >
> >      >                  >
> >      >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
I agree it would be nice to keep the current distribution of elements 
instead of doing a shuffle based on an artificial shard key.

Have you tried `withWindowedWrites()`? Also, why do you say you need to 
specify the number of shards in streaming mode?

-Max

On 25.10.18 10:12, Jozef Vilcek wrote:
> Hm, yes, this makes sense now, but what can be done for my case? I do 
> not want to end up with too many files on disk.
> 
> I think what I am looking for is to instruct IO that do not do again 
> random shard and reshuffle but just assume number of shards equal to 
> number of workers and shard ID is a worker ID.
> Is this doable in beam model?
> 
> On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     The FlinkRunner uses a hash function (MurmurHash) on each key which
>     places keys somewhere in the hash space. The hash space (2^32) is split
>     among the partitions (5 in your case). Given enough keys, the chance
>     increases they are equally spread.
> 
>     This should be similar to what the other Runners do.
> 
>     On 24.10.18 10:58, Jozef Vilcek wrote:
>      >
>      > So if I run 5 workers with 50 shards, I end up with:
>      >
>      > DurationBytes receivedRecords received
>      >   2m 39s        900 MB            465,525
>      >   2m 39s       1.76 GB            930,720
>      >   2m 39s        789 MB            407,315
>      >   2m 39s       1.32 GB            698,262
>      >   2m 39s        788 MB            407,310
>      >
>      > Still not good but better than with 5 shards where some workers
>     did not
>      > participate at all.
>      > So, problem is in some layer which distributes keys / shards
>     among workers?
>      >
>      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
>     <ma...@google.com>
>      > <mailto:relax@google.com <ma...@google.com>>> wrote:
>      >
>      >     withNumShards(5) generates 5 random shards. It turns out that
>      >     statistically when you generate 5 random shards and you have 5
>      >     works, the probability is reasonably high that some workers
>     will get
>      >     more than one shard (and as a result not all workers will
>      >     participate). Are you able to set the number of shards larger
>     than 5?
>      >
>      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>
>      >     <mailto:jozo.vilcek@gmail.com
>     <ma...@gmail.com>>> wrote:
>      >
>      >         cc (dev)
>      >
>      >         I tried to run the example with FlinkRunner in batch mode and
>      >         received again bad data spread among the workers.
>      >
>      >         When I tried to remove number of shards for batch mode in
>     above
>      >         example, pipeline crashed before launch
>      >
>      >         Caused by: java.lang.IllegalStateException: Inputs to Flatten
>      >         had incompatible triggers:
>      >       
>       AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>      >         entCountAtLeast(10000)),
>      >       
>       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>      >         hour)))),
>      >       
>       AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>      >         rever(AfterPane.elementCountAtLeast(1)),
>      >       
>       Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>      >
>      >
>      >
>      >
>      >
>      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>      >         <jozo.vilcek@gmail.com <ma...@gmail.com>
>     <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>> wrote:
>      >
>      >             Hi Max,
>      >
>      >             I forgot to mention that example is run in streaming
>     mode,
>      >             therefore I can not do writes without specifying shards.
>      >             FileIO explicitly asks for them.
>      >
>      >             I am not sure where the problem is. FlinkRunner is
>     only one
>      >             I used.
>      >
>      >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
>      >             <mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >                 Hi Jozef,
>      >
>      >                 This does not look like a FlinkRunner related
>     problem,
>      >                 but is caused by
>      >                 the `WriteFiles` sharding logic. It assigns keys and
>      >                 does a Reshuffle
>      >                 which apparently does not lead to good data spread in
>      >                 your case.
>      >
>      >                 Do you see the same behavior without
>     `withNumShards(5)`?
>      >
>      >                 Thanks,
>      >                 Max
>      >
>      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>      >                  > Hello,
>      >                  >
>      >                  > I am having some trouble to get a balanced
>     write via
>      >                 FileIO. Workers at
>      >                  > the shuffle side where data per window fire are
>      >                 written to the
>      >                  > filesystem receive unbalanced number of events.
>      >                  >
>      >                  > Here is a naive code example:
>      >                  >
>      >                  >      val read = KafkaIO.read()
>      >                  >          .withTopic("topic")
>      >                  >          .withBootstrapServers("kafka1:9092")
>      >                  >
>      >                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
>      >                  >
>      >               
>       .withValueDeserializer(classOf[ByteArrayDeserializer])
>      >                  >          .withProcessingTime()
>      >                  >
>      >                  >      pipeline
>      >                  >          .apply(read)
>      >                  >          .apply(MapElements.via(new
>      >                  > SimpleFunction[KafkaRecord[Array[Byte],
>     Array[Byte]],
>      >                 String]() {
>      >                  >            override def apply(input:
>      >                 KafkaRecord[Array[Byte],
>      >                  > Array[Byte]]): String = {
>      >                  >              new String(input.getKV.getValue,
>     "UTF-8")
>      >                  >            }
>      >                  >          }))
>      >                  >
>      >                  >
>      >                  >
>      >               
>       .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>      >                  >             
>     .triggering(AfterWatermark.pastEndOfWindow()
>      >                  >
>      >               
>       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>      >                  >
>      >               
>       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>      >                  >
>      >               
>       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>      >                  >
>      >                  >
>      >               
>       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>      >                  >              .discardingFiredPanes()
>      >                  >
>      >                 .withAllowedLateness(Duration.standardDays(7)))
>      >                  >
>      >                  >          .apply(FileIO.write()
>      >                  >              .via(TextIO.sink())
>      >                  >              .withNaming(new
>      >                 SafeFileNaming(outputPath, ".txt"))
>      >                  >              .withTempDirectory(tempLocation)
>      >                  >              .withNumShards(5))
>      >                  >
>      >                  >
>      >                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
>      >                 workers (equal to
>      >                  > number of shards), I would expect that each worker
>      >                 will participate on
>      >                  > persisting shards and equally, since code uses
>     fixed
>      >                 number of shards
>      >                  > (and random shard assign?). But reality is
>     different
>      >                 (see 2 attachements
>      >                  > - statistiscs from flink task reading from
>     kafka and
>      >                 task writing to files)
>      >                  >
>      >                  > What am I missing? How to achieve balanced writes?
>      >                  >
>      >                  > Thanks,
>      >                  > Jozef
>      >                  >
>      >                  >
>      >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
I agree it would be nice to keep the current distribution of elements 
instead of doing a shuffle based on an artificial shard key.

Have you tried `withWindowedWrites()`? Also, why do you say you need to 
specify the number of shards in streaming mode?

-Max

On 25.10.18 10:12, Jozef Vilcek wrote:
> Hm, yes, this makes sense now, but what can be done for my case? I do 
> not want to end up with too many files on disk.
> 
> I think what I am looking for is to instruct IO that do not do again 
> random shard and reshuffle but just assume number of shards equal to 
> number of workers and shard ID is a worker ID.
> Is this doable in beam model?
> 
> On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     The FlinkRunner uses a hash function (MurmurHash) on each key which
>     places keys somewhere in the hash space. The hash space (2^32) is split
>     among the partitions (5 in your case). Given enough keys, the chance
>     increases they are equally spread.
> 
>     This should be similar to what the other Runners do.
> 
>     On 24.10.18 10:58, Jozef Vilcek wrote:
>      >
>      > So if I run 5 workers with 50 shards, I end up with:
>      >
>      > DurationBytes receivedRecords received
>      >   2m 39s        900 MB            465,525
>      >   2m 39s       1.76 GB            930,720
>      >   2m 39s        789 MB            407,315
>      >   2m 39s       1.32 GB            698,262
>      >   2m 39s        788 MB            407,310
>      >
>      > Still not good but better than with 5 shards where some workers
>     did not
>      > participate at all.
>      > So, problem is in some layer which distributes keys / shards
>     among workers?
>      >
>      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
>     <ma...@google.com>
>      > <mailto:relax@google.com <ma...@google.com>>> wrote:
>      >
>      >     withNumShards(5) generates 5 random shards. It turns out that
>      >     statistically when you generate 5 random shards and you have 5
>      >     works, the probability is reasonably high that some workers
>     will get
>      >     more than one shard (and as a result not all workers will
>      >     participate). Are you able to set the number of shards larger
>     than 5?
>      >
>      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>
>      >     <mailto:jozo.vilcek@gmail.com
>     <ma...@gmail.com>>> wrote:
>      >
>      >         cc (dev)
>      >
>      >         I tried to run the example with FlinkRunner in batch mode and
>      >         received again bad data spread among the workers.
>      >
>      >         When I tried to remove number of shards for batch mode in
>     above
>      >         example, pipeline crashed before launch
>      >
>      >         Caused by: java.lang.IllegalStateException: Inputs to Flatten
>      >         had incompatible triggers:
>      >       
>       AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>      >         entCountAtLeast(10000)),
>      >       
>       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>      >         hour)))),
>      >       
>       AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>      >         rever(AfterPane.elementCountAtLeast(1)),
>      >       
>       Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>      >
>      >
>      >
>      >
>      >
>      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>      >         <jozo.vilcek@gmail.com <ma...@gmail.com>
>     <mailto:jozo.vilcek@gmail.com <ma...@gmail.com>>> wrote:
>      >
>      >             Hi Max,
>      >
>      >             I forgot to mention that example is run in streaming
>     mode,
>      >             therefore I can not do writes without specifying shards.
>      >             FileIO explicitly asks for them.
>      >
>      >             I am not sure where the problem is. FlinkRunner is
>     only one
>      >             I used.
>      >
>      >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
>      >             <mxm@apache.org <ma...@apache.org>
>     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >                 Hi Jozef,
>      >
>      >                 This does not look like a FlinkRunner related
>     problem,
>      >                 but is caused by
>      >                 the `WriteFiles` sharding logic. It assigns keys and
>      >                 does a Reshuffle
>      >                 which apparently does not lead to good data spread in
>      >                 your case.
>      >
>      >                 Do you see the same behavior without
>     `withNumShards(5)`?
>      >
>      >                 Thanks,
>      >                 Max
>      >
>      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
>      >                  > Hello,
>      >                  >
>      >                  > I am having some trouble to get a balanced
>     write via
>      >                 FileIO. Workers at
>      >                  > the shuffle side where data per window fire are
>      >                 written to the
>      >                  > filesystem receive unbalanced number of events.
>      >                  >
>      >                  > Here is a naive code example:
>      >                  >
>      >                  >      val read = KafkaIO.read()
>      >                  >          .withTopic("topic")
>      >                  >          .withBootstrapServers("kafka1:9092")
>      >                  >
>      >                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
>      >                  >
>      >               
>       .withValueDeserializer(classOf[ByteArrayDeserializer])
>      >                  >          .withProcessingTime()
>      >                  >
>      >                  >      pipeline
>      >                  >          .apply(read)
>      >                  >          .apply(MapElements.via(new
>      >                  > SimpleFunction[KafkaRecord[Array[Byte],
>     Array[Byte]],
>      >                 String]() {
>      >                  >            override def apply(input:
>      >                 KafkaRecord[Array[Byte],
>      >                  > Array[Byte]]): String = {
>      >                  >              new String(input.getKV.getValue,
>     "UTF-8")
>      >                  >            }
>      >                  >          }))
>      >                  >
>      >                  >
>      >                  >
>      >               
>       .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>      >                  >             
>     .triggering(AfterWatermark.pastEndOfWindow()
>      >                  >
>      >               
>       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>      >                  >
>      >               
>       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>      >                  >
>      >               
>       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>      >                  >
>      >                  >
>      >               
>       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>      >                  >              .discardingFiredPanes()
>      >                  >
>      >                 .withAllowedLateness(Duration.standardDays(7)))
>      >                  >
>      >                  >          .apply(FileIO.write()
>      >                  >              .via(TextIO.sink())
>      >                  >              .withNaming(new
>      >                 SafeFileNaming(outputPath, ".txt"))
>      >                  >              .withTempDirectory(tempLocation)
>      >                  >              .withNumShards(5))
>      >                  >
>      >                  >
>      >                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
>      >                 workers (equal to
>      >                  > number of shards), I would expect that each worker
>      >                 will participate on
>      >                  > persisting shards and equally, since code uses
>     fixed
>      >                 number of shards
>      >                  > (and random shard assign?). But reality is
>     different
>      >                 (see 2 attachements
>      >                  > - statistiscs from flink task reading from
>     kafka and
>      >                 task writing to files)
>      >                  >
>      >                  > What am I missing? How to achieve balanced writes?
>      >                  >
>      >                  > Thanks,
>      >                  > Jozef
>      >                  >
>      >                  >
>      >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
Hm, yes, this makes sense now, but what can be done for my case? I do not
want to end up with too many files on disk.

I think what I am looking for is to instruct IO that do not do again random
shard and reshuffle but just assume number of shards equal to number of
workers and shard ID is a worker ID.
Is this doable in beam model?

On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mx...@apache.org> wrote:

> The FlinkRunner uses a hash function (MurmurHash) on each key which
> places keys somewhere in the hash space. The hash space (2^32) is split
> among the partitions (5 in your case). Given enough keys, the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes receivedRecords received
> >   2m 39s        900 MB            465,525
> >   2m 39s       1.76 GB            930,720
> >   2m 39s        789 MB            407,315
> >   2m 39s       1.32 GB            698,262
> >   2m 39s        788 MB            407,310
> >
> > Still not good but better than with 5 shards where some workers did not
> > participate at all.
> > So, problem is in some layer which distributes keys / shards among
> workers?
> >
> > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
> > <ma...@google.com>> wrote:
> >
> >     withNumShards(5) generates 5 random shards. It turns out that
> >     statistically when you generate 5 random shards and you have 5
> >     works, the probability is reasonably high that some workers will get
> >     more than one shard (and as a result not all workers will
> >     participate). Are you able to set the number of shards larger than 5?
> >
> >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
> >     <ma...@gmail.com>> wrote:
> >
> >         cc (dev)
> >
> >         I tried to run the example with FlinkRunner in batch mode and
> >         received again bad data spread among the workers.
> >
> >         When I tried to remove number of shards for batch mode in above
> >         example, pipeline crashed before launch
> >
> >         Caused by: java.lang.IllegalStateException: Inputs to Flatten
> >         had incompatible triggers:
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >         entCountAtLeast(10000)),
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >         hour)))),
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >         rever(AfterPane.elementCountAtLeast(1)),
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >
> >
> >
> >
> >
> >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >         <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >
> >             Hi Max,
> >
> >             I forgot to mention that example is run in streaming mode,
> >             therefore I can not do writes without specifying shards.
> >             FileIO explicitly asks for them.
> >
> >             I am not sure where the problem is. FlinkRunner is only one
> >             I used.
> >
> >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> >             <mxm@apache.org <ma...@apache.org>> wrote:
> >
> >                 Hi Jozef,
> >
> >                 This does not look like a FlinkRunner related problem,
> >                 but is caused by
> >                 the `WriteFiles` sharding logic. It assigns keys and
> >                 does a Reshuffle
> >                 which apparently does not lead to good data spread in
> >                 your case.
> >
> >                 Do you see the same behavior without `withNumShards(5)`?
> >
> >                 Thanks,
> >                 Max
> >
> >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >                  > Hello,
> >                  >
> >                  > I am having some trouble to get a balanced write via
> >                 FileIO. Workers at
> >                  > the shuffle side where data per window fire are
> >                 written to the
> >                  > filesystem receive unbalanced number of events.
> >                  >
> >                  > Here is a naive code example:
> >                  >
> >                  >      val read = KafkaIO.read()
> >                  >          .withTopic("topic")
> >                  >          .withBootstrapServers("kafka1:9092")
> >                  >
> >                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >                  >
> >                 .withValueDeserializer(classOf[ByteArrayDeserializer])
> >                  >          .withProcessingTime()
> >                  >
> >                  >      pipeline
> >                  >          .apply(read)
> >                  >          .apply(MapElements.via(new
> >                  > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
> >                 String]() {
> >                  >            override def apply(input:
> >                 KafkaRecord[Array[Byte],
> >                  > Array[Byte]]): String = {
> >                  >              new String(input.getKV.getValue, "UTF-8")
> >                  >            }
> >                  >          }))
> >                  >
> >                  >
> >                  >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >                  >
> .triggering(AfterWatermark.pastEndOfWindow()
> >                  >
> >                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >                  >
> >
>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >                  >
> >                 Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >                  >
> >                  >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >                  >              .discardingFiredPanes()
> >                  >
> >                 .withAllowedLateness(Duration.standardDays(7)))
> >                  >
> >                  >          .apply(FileIO.write()
> >                  >              .via(TextIO.sink())
> >                  >              .withNaming(new
> >                 SafeFileNaming(outputPath, ".txt"))
> >                  >              .withTempDirectory(tempLocation)
> >                  >              .withNumShards(5))
> >                  >
> >                  >
> >                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
> >                 workers (equal to
> >                  > number of shards), I would expect that each worker
> >                 will participate on
> >                  > persisting shards and equally, since code uses fixed
> >                 number of shards
> >                  > (and random shard assign?). But reality is different
> >                 (see 2 attachements
> >                  > - statistiscs from flink task reading from kafka and
> >                 task writing to files)
> >                  >
> >                  > What am I missing? How to achieve balanced writes?
> >                  >
> >                  > Thanks,
> >                  > Jozef
> >                  >
> >                  >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
Hm, yes, this makes sense now, but what can be done for my case? I do not
want to end up with too many files on disk.

I think what I am looking for is to instruct IO that do not do again random
shard and reshuffle but just assume number of shards equal to number of
workers and shard ID is a worker ID.
Is this doable in beam model?

On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <mx...@apache.org> wrote:

> The FlinkRunner uses a hash function (MurmurHash) on each key which
> places keys somewhere in the hash space. The hash space (2^32) is split
> among the partitions (5 in your case). Given enough keys, the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes receivedRecords received
> >   2m 39s        900 MB            465,525
> >   2m 39s       1.76 GB            930,720
> >   2m 39s        789 MB            407,315
> >   2m 39s       1.32 GB            698,262
> >   2m 39s        788 MB            407,310
> >
> > Still not good but better than with 5 shards where some workers did not
> > participate at all.
> > So, problem is in some layer which distributes keys / shards among
> workers?
> >
> > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com
> > <ma...@google.com>> wrote:
> >
> >     withNumShards(5) generates 5 random shards. It turns out that
> >     statistically when you generate 5 random shards and you have 5
> >     works, the probability is reasonably high that some workers will get
> >     more than one shard (and as a result not all workers will
> >     participate). Are you able to set the number of shards larger than 5?
> >
> >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
> >     <ma...@gmail.com>> wrote:
> >
> >         cc (dev)
> >
> >         I tried to run the example with FlinkRunner in batch mode and
> >         received again bad data spread among the workers.
> >
> >         When I tried to remove number of shards for batch mode in above
> >         example, pipeline crashed before launch
> >
> >         Caused by: java.lang.IllegalStateException: Inputs to Flatten
> >         had incompatible triggers:
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >         entCountAtLeast(10000)),
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >         hour)))),
> >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >         rever(AfterPane.elementCountAtLeast(1)),
> >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >
> >
> >
> >
> >
> >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >         <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >
> >             Hi Max,
> >
> >             I forgot to mention that example is run in streaming mode,
> >             therefore I can not do writes without specifying shards.
> >             FileIO explicitly asks for them.
> >
> >             I am not sure where the problem is. FlinkRunner is only one
> >             I used.
> >
> >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> >             <mxm@apache.org <ma...@apache.org>> wrote:
> >
> >                 Hi Jozef,
> >
> >                 This does not look like a FlinkRunner related problem,
> >                 but is caused by
> >                 the `WriteFiles` sharding logic. It assigns keys and
> >                 does a Reshuffle
> >                 which apparently does not lead to good data spread in
> >                 your case.
> >
> >                 Do you see the same behavior without `withNumShards(5)`?
> >
> >                 Thanks,
> >                 Max
> >
> >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >                  > Hello,
> >                  >
> >                  > I am having some trouble to get a balanced write via
> >                 FileIO. Workers at
> >                  > the shuffle side where data per window fire are
> >                 written to the
> >                  > filesystem receive unbalanced number of events.
> >                  >
> >                  > Here is a naive code example:
> >                  >
> >                  >      val read = KafkaIO.read()
> >                  >          .withTopic("topic")
> >                  >          .withBootstrapServers("kafka1:9092")
> >                  >
> >                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >                  >
> >                 .withValueDeserializer(classOf[ByteArrayDeserializer])
> >                  >          .withProcessingTime()
> >                  >
> >                  >      pipeline
> >                  >          .apply(read)
> >                  >          .apply(MapElements.via(new
> >                  > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
> >                 String]() {
> >                  >            override def apply(input:
> >                 KafkaRecord[Array[Byte],
> >                  > Array[Byte]]): String = {
> >                  >              new String(input.getKV.getValue, "UTF-8")
> >                  >            }
> >                  >          }))
> >                  >
> >                  >
> >                  >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >                  >
> .triggering(AfterWatermark.pastEndOfWindow()
> >                  >
> >                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >                  >
> >
>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >                  >
> >                 Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >                  >
> >                  >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >                  >              .discardingFiredPanes()
> >                  >
> >                 .withAllowedLateness(Duration.standardDays(7)))
> >                  >
> >                  >          .apply(FileIO.write()
> >                  >              .via(TextIO.sink())
> >                  >              .withNaming(new
> >                 SafeFileNaming(outputPath, ".txt"))
> >                  >              .withTempDirectory(tempLocation)
> >                  >              .withNumShards(5))
> >                  >
> >                  >
> >                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
> >                 workers (equal to
> >                  > number of shards), I would expect that each worker
> >                 will participate on
> >                  > persisting shards and equally, since code uses fixed
> >                 number of shards
> >                  > (and random shard assign?). But reality is different
> >                 (see 2 attachements
> >                  > - statistiscs from flink task reading from kafka and
> >                 task writing to files)
> >                  >
> >                  > What am I missing? How to achieve balanced writes?
> >                  >
> >                  > Thanks,
> >                  > Jozef
> >                  >
> >                  >
> >
>

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
The FlinkRunner uses a hash function (MurmurHash) on each key which 
places keys somewhere in the hash space. The hash space (2^32) is split 
among the partitions (5 in your case). Given enough keys, the chance 
increases they are equally spread.

This should be similar to what the other Runners do.

On 24.10.18 10:58, Jozef Vilcek wrote:
> 
> So if I run 5 workers with 50 shards, I end up with:
> 
> DurationBytes receivedRecords received
>   2m 39s        900 MB            465,525
>   2m 39s       1.76 GB            930,720
>   2m 39s        789 MB            407,315
>   2m 39s       1.32 GB            698,262
>   2m 39s        788 MB            407,310
> 
> Still not good but better than with 5 shards where some workers did not 
> participate at all.
> So, problem is in some layer which distributes keys / shards among workers?
> 
> On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
> 
>     withNumShards(5) generates 5 random shards. It turns out that
>     statistically when you generate 5 random shards and you have 5
>     works, the probability is reasonably high that some workers will get
>     more than one shard (and as a result not all workers will
>     participate). Are you able to set the number of shards larger than 5?
> 
>     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>         cc (dev)
> 
>         I tried to run the example with FlinkRunner in batch mode and
>         received again bad data spread among the workers.
> 
>         When I tried to remove number of shards for batch mode in above
>         example, pipeline crashed before launch
> 
>         Caused by: java.lang.IllegalStateException: Inputs to Flatten
>         had incompatible triggers:
>         AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>         entCountAtLeast(10000)),
>         Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>         hour)))),
>         AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>         rever(AfterPane.elementCountAtLeast(1)),
>         Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> 
> 
> 
> 
> 
>         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>         <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> 
>             Hi Max,
> 
>             I forgot to mention that example is run in streaming mode,
>             therefore I can not do writes without specifying shards.
>             FileIO explicitly asks for them.
> 
>             I am not sure where the problem is. FlinkRunner is only one
>             I used.
> 
>             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
>             <mxm@apache.org <ma...@apache.org>> wrote:
> 
>                 Hi Jozef,
> 
>                 This does not look like a FlinkRunner related problem,
>                 but is caused by
>                 the `WriteFiles` sharding logic. It assigns keys and
>                 does a Reshuffle
>                 which apparently does not lead to good data spread in
>                 your case.
> 
>                 Do you see the same behavior without `withNumShards(5)`?
> 
>                 Thanks,
>                 Max
> 
>                 On 22.10.18 11:57, Jozef Vilcek wrote:
>                  > Hello,
>                  >
>                  > I am having some trouble to get a balanced write via
>                 FileIO. Workers at
>                  > the shuffle side where data per window fire are
>                 written to the
>                  > filesystem receive unbalanced number of events.
>                  >
>                  > Here is a naive code example:
>                  >
>                  >      val read = KafkaIO.read()
>                  >          .withTopic("topic")
>                  >          .withBootstrapServers("kafka1:9092")
>                  >         
>                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
>                  >         
>                 .withValueDeserializer(classOf[ByteArrayDeserializer])
>                  >          .withProcessingTime()
>                  >
>                  >      pipeline
>                  >          .apply(read)
>                  >          .apply(MapElements.via(new
>                  > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
>                 String]() {
>                  >            override def apply(input:
>                 KafkaRecord[Array[Byte],
>                  > Array[Byte]]): String = {
>                  >              new String(input.getKV.getValue, "UTF-8")
>                  >            }
>                  >          }))
>                  >
>                  >
>                  >
>                 .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>                  >              .triggering(AfterWatermark.pastEndOfWindow()
>                  >                 
>                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>                  >                 
>                 .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>                  >                   
>                 Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>                  >
>                  >
>                 Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>                  >              .discardingFiredPanes()
>                  >             
>                 .withAllowedLateness(Duration.standardDays(7)))
>                  >
>                  >          .apply(FileIO.write()
>                  >              .via(TextIO.sink())
>                  >              .withNaming(new
>                 SafeFileNaming(outputPath, ".txt"))
>                  >              .withTempDirectory(tempLocation)
>                  >              .withNumShards(5))
>                  >
>                  >
>                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
>                 workers (equal to
>                  > number of shards), I would expect that each worker
>                 will participate on
>                  > persisting shards and equally, since code uses fixed
>                 number of shards
>                  > (and random shard assign?). But reality is different
>                 (see 2 attachements
>                  > - statistiscs from flink task reading from kafka and
>                 task writing to files)
>                  >
>                  > What am I missing? How to achieve balanced writes?
>                  >
>                  > Thanks,
>                  > Jozef
>                  >
>                  >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Maximilian Michels <mx...@apache.org>.
The FlinkRunner uses a hash function (MurmurHash) on each key which 
places keys somewhere in the hash space. The hash space (2^32) is split 
among the partitions (5 in your case). Given enough keys, the chance 
increases they are equally spread.

This should be similar to what the other Runners do.

On 24.10.18 10:58, Jozef Vilcek wrote:
> 
> So if I run 5 workers with 50 shards, I end up with:
> 
> DurationBytes receivedRecords received
>   2m 39s        900 MB            465,525
>   2m 39s       1.76 GB            930,720
>   2m 39s        789 MB            407,315
>   2m 39s       1.32 GB            698,262
>   2m 39s        788 MB            407,310
> 
> Still not good but better than with 5 shards where some workers did not 
> participate at all.
> So, problem is in some layer which distributes keys / shards among workers?
> 
> On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
> 
>     withNumShards(5) generates 5 random shards. It turns out that
>     statistically when you generate 5 random shards and you have 5
>     works, the probability is reasonably high that some workers will get
>     more than one shard (and as a result not all workers will
>     participate). Are you able to set the number of shards larger than 5?
> 
>     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vilcek@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>         cc (dev)
> 
>         I tried to run the example with FlinkRunner in batch mode and
>         received again bad data spread among the workers.
> 
>         When I tried to remove number of shards for batch mode in above
>         example, pipeline crashed before launch
> 
>         Caused by: java.lang.IllegalStateException: Inputs to Flatten
>         had incompatible triggers:
>         AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>         entCountAtLeast(10000)),
>         Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>         hour)))),
>         AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>         rever(AfterPane.elementCountAtLeast(1)),
>         Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> 
> 
> 
> 
> 
>         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
>         <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> 
>             Hi Max,
> 
>             I forgot to mention that example is run in streaming mode,
>             therefore I can not do writes without specifying shards.
>             FileIO explicitly asks for them.
> 
>             I am not sure where the problem is. FlinkRunner is only one
>             I used.
> 
>             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
>             <mxm@apache.org <ma...@apache.org>> wrote:
> 
>                 Hi Jozef,
> 
>                 This does not look like a FlinkRunner related problem,
>                 but is caused by
>                 the `WriteFiles` sharding logic. It assigns keys and
>                 does a Reshuffle
>                 which apparently does not lead to good data spread in
>                 your case.
> 
>                 Do you see the same behavior without `withNumShards(5)`?
> 
>                 Thanks,
>                 Max
> 
>                 On 22.10.18 11:57, Jozef Vilcek wrote:
>                  > Hello,
>                  >
>                  > I am having some trouble to get a balanced write via
>                 FileIO. Workers at
>                  > the shuffle side where data per window fire are
>                 written to the
>                  > filesystem receive unbalanced number of events.
>                  >
>                  > Here is a naive code example:
>                  >
>                  >      val read = KafkaIO.read()
>                  >          .withTopic("topic")
>                  >          .withBootstrapServers("kafka1:9092")
>                  >         
>                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
>                  >         
>                 .withValueDeserializer(classOf[ByteArrayDeserializer])
>                  >          .withProcessingTime()
>                  >
>                  >      pipeline
>                  >          .apply(read)
>                  >          .apply(MapElements.via(new
>                  > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
>                 String]() {
>                  >            override def apply(input:
>                 KafkaRecord[Array[Byte],
>                  > Array[Byte]]): String = {
>                  >              new String(input.getKV.getValue, "UTF-8")
>                  >            }
>                  >          }))
>                  >
>                  >
>                  >
>                 .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>                  >              .triggering(AfterWatermark.pastEndOfWindow()
>                  >                 
>                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>                  >                 
>                 .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>                  >                   
>                 Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>                  >
>                  >
>                 Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>                  >              .discardingFiredPanes()
>                  >             
>                 .withAllowedLateness(Duration.standardDays(7)))
>                  >
>                  >          .apply(FileIO.write()
>                  >              .via(TextIO.sink())
>                  >              .withNaming(new
>                 SafeFileNaming(outputPath, ".txt"))
>                  >              .withTempDirectory(tempLocation)
>                  >              .withNumShards(5))
>                  >
>                  >
>                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
>                 workers (equal to
>                  > number of shards), I would expect that each worker
>                 will participate on
>                  > persisting shards and equally, since code uses fixed
>                 number of shards
>                  > (and random shard assign?). But reality is different
>                 (see 2 attachements
>                  > - statistiscs from flink task reading from kafka and
>                 task writing to files)
>                  >
>                  > What am I missing? How to achieve balanced writes?
>                  >
>                  > Thanks,
>                  > Jozef
>                  >
>                  >
> 

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s         900 MB             465,525
 2m 39s        1.76 GB             930,720
 2m 39s         789 MB             407,315
 2m 39s        1.32 GB             698,262
 2m 39s         788 MB             407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com> wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(10000)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour)))),
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Hi Jozef,
>>>>
>>>> This does not look like a FlinkRunner related problem, but is caused by
>>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>>> which apparently does not lead to good data spread in your case.
>>>>
>>>> Do you see the same behavior without `withNumShards(5)`?
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>>> > Hello,
>>>> >
>>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>>> at
>>>> > the shuffle side where data per window fire are written to the
>>>> > filesystem receive unbalanced number of events.
>>>> >
>>>> > Here is a naive code example:
>>>> >
>>>> >      val read = KafkaIO.read()
>>>> >          .withTopic("topic")
>>>> >          .withBootstrapServers("kafka1:9092")
>>>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withProcessingTime()
>>>> >
>>>> >      pipeline
>>>> >          .apply(read)
>>>> >          .apply(MapElements.via(new
>>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>>> >            override def apply(input: KafkaRecord[Array[Byte],
>>>> > Array[Byte]]): String = {
>>>> >              new String(input.getKV.getValue, "UTF-8")
>>>> >            }
>>>> >          }))
>>>> >
>>>> >
>>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>> >              .triggering(AfterWatermark.pastEndOfWindow()
>>>> >
>>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>>> >
>>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>> >
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>>> >
>>>> >
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>>> >              .discardingFiredPanes()
>>>> >              .withAllowedLateness(Duration.standardDays(7)))
>>>> >
>>>> >          .apply(FileIO.write()
>>>> >              .via(TextIO.sink())
>>>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>>> >              .withTempDirectory(tempLocation)
>>>> >              .withNumShards(5))
>>>> >
>>>> >
>>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>>> > number of shards), I would expect that each worker will participate
>>>> on
>>>> > persisting shards and equally, since code uses fixed number of shards
>>>> > (and random shard assign?). But reality is different (see 2
>>>> attachements
>>>> > - statistiscs from flink task reading from kafka and task writing to
>>>> files)
>>>> >
>>>> > What am I missing? How to achieve balanced writes?
>>>> >
>>>> > Thanks,
>>>> > Jozef
>>>> >
>>>> >
>>>>
>>>

Re: Unbalanced FileIO writes on Flink

Posted by Jozef Vilcek <jo...@gmail.com>.
So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s         900 MB             465,525
 2m 39s        1.76 GB             930,720
 2m 39s         789 MB             407,315
 2m 39s        1.32 GB             698,262
 2m 39s         788 MB             407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com> wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(10000)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour)))),
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Hi Jozef,
>>>>
>>>> This does not look like a FlinkRunner related problem, but is caused by
>>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>>> which apparently does not lead to good data spread in your case.
>>>>
>>>> Do you see the same behavior without `withNumShards(5)`?
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>>> > Hello,
>>>> >
>>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>>> at
>>>> > the shuffle side where data per window fire are written to the
>>>> > filesystem receive unbalanced number of events.
>>>> >
>>>> > Here is a naive code example:
>>>> >
>>>> >      val read = KafkaIO.read()
>>>> >          .withTopic("topic")
>>>> >          .withBootstrapServers("kafka1:9092")
>>>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withProcessingTime()
>>>> >
>>>> >      pipeline
>>>> >          .apply(read)
>>>> >          .apply(MapElements.via(new
>>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>>> >            override def apply(input: KafkaRecord[Array[Byte],
>>>> > Array[Byte]]): String = {
>>>> >              new String(input.getKV.getValue, "UTF-8")
>>>> >            }
>>>> >          }))
>>>> >
>>>> >
>>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>> >              .triggering(AfterWatermark.pastEndOfWindow()
>>>> >
>>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>>> >
>>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>> >
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>>> >
>>>> >
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>>> >              .discardingFiredPanes()
>>>> >              .withAllowedLateness(Duration.standardDays(7)))
>>>> >
>>>> >          .apply(FileIO.write()
>>>> >              .via(TextIO.sink())
>>>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>>> >              .withTempDirectory(tempLocation)
>>>> >              .withNumShards(5))
>>>> >
>>>> >
>>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>>> > number of shards), I would expect that each worker will participate
>>>> on
>>>> > persisting shards and equally, since code uses fixed number of shards
>>>> > (and random shard assign?). But reality is different (see 2
>>>> attachements
>>>> > - statistiscs from flink task reading from kafka and task writing to
>>>> files)
>>>> >
>>>> > What am I missing? How to achieve balanced writes?
>>>> >
>>>> > Thanks,
>>>> > Jozef
>>>> >
>>>> >
>>>>
>>>

Re: Unbalanced FileIO writes on Flink

Posted by Reuven Lax <re...@google.com>.
withNumShards(5) generates 5 random shards. It turns out that statistically
when you generate 5 random shards and you have 5 works, the probability is
reasonably high that some workers will get more than one shard (and as a
result not all workers will participate). Are you able to set the number of
shards larger than 5?

On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jo...@gmail.com> wrote:

> cc (dev)
>
> I tried to run the example with FlinkRunner in batch mode and received
> again bad data spread among the workers.
>
> When I tried to remove number of shards for batch mode in above example,
> pipeline crashed before launch
>
> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
> incompatible triggers:
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> entCountAtLeast(10000)),
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> hour)))),
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> rever(AfterPane.elementCountAtLeast(1)),
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>
>
>
>
>
> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jo...@gmail.com>
> wrote:
>
>> Hi Max,
>>
>> I forgot to mention that example is run in streaming mode, therefore I
>> can not do writes without specifying shards. FileIO explicitly asks for
>> them.
>>
>> I am not sure where the problem is. FlinkRunner is only one I used.
>>
>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> This does not look like a FlinkRunner related problem, but is caused by
>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>> which apparently does not lead to good data spread in your case.
>>>
>>> Do you see the same behavior without `withNumShards(5)`?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>> at
>>> > the shuffle side where data per window fire are written to the
>>> > filesystem receive unbalanced number of events.
>>> >
>>> > Here is a naive code example:
>>> >
>>> >      val read = KafkaIO.read()
>>> >          .withTopic("topic")
>>> >          .withBootstrapServers("kafka1:9092")
>>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>>> >          .withProcessingTime()
>>> >
>>> >      pipeline
>>> >          .apply(read)
>>> >          .apply(MapElements.via(new
>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>> >            override def apply(input: KafkaRecord[Array[Byte],
>>> > Array[Byte]]): String = {
>>> >              new String(input.getKV.getValue, "UTF-8")
>>> >            }
>>> >          }))
>>> >
>>> >
>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>> >              .triggering(AfterWatermark.pastEndOfWindow()
>>> >
>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>> >
>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>> >
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>> >
>>> >
>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>> >              .discardingFiredPanes()
>>> >              .withAllowedLateness(Duration.standardDays(7)))
>>> >
>>> >          .apply(FileIO.write()
>>> >              .via(TextIO.sink())
>>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>> >              .withTempDirectory(tempLocation)
>>> >              .withNumShards(5))
>>> >
>>> >
>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>> > number of shards), I would expect that each worker will participate on
>>> > persisting shards and equally, since code uses fixed number of shards
>>> > (and random shard assign?). But reality is different (see 2
>>> attachements
>>> > - statistiscs from flink task reading from kafka and task writing to
>>> files)
>>> >
>>> > What am I missing? How to achieve balanced writes?
>>> >
>>> > Thanks,
>>> > Jozef
>>> >
>>> >
>>>
>>