You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 03:57:45 UTC

[GitHub] [beam] kennknowles opened a new issue, #19564: Dynamic Writer - combining computed shards' number for late events with window's

kennknowles opened a new issue, #19564:
URL: https://github.com/apache/beam/issues/19564

   Runner attempts to combine shards' numbers computed for the window and following panes with late events even if the window's accumulation mode is set to DISCARDING_FIRED_PANES. This results in an exception thrown by SingletonCombineFn.
   
   Steps to recreate this behaviour:
    - create dynamic writer with `withSharding()` option
    - send stream of messages to Dataflow job via PubSub
    - retain *some* messages
    - let the rest of the messages flow to the job, until the watermark reaches the window's end
    - release retained messages
   
   In case all PubSub traffic is halted and released after window's end, Beam won't try to merge them. This only happens, if just a part of messages come as late events.
   
   Stacktrace:
   ```
   
   java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton
   view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
   
          org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
           org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
   
          org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
           org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
   
          org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
   
          org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
   
          org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
   
          org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
   
          org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
   
          org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
   
          org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
   
          org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
   
          org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
   
          org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
   
          org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
   
          org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
   
          org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
   
          org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
   
          org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
   
          org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
   
          org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
   
          java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   
          java.lang.Thread.run(Thread.java:745)
   
   ```
   
   Sharding implementation:
   ```
   
   class RecordCountSharding[T](recordsPerShard: Int) extends PTransform[PCollection[T], PCollectionView[java.lang.Integer]]
   {
     import RecordCountSharding._
     override def expand(input: PCollection[T]): PCollectionView[java.lang.Integer]
   = {
       val count = input.apply(
         Combine.globally(Count.combineFn[T]()).withoutDefaults()
   
      )
   
       val shardsNum = count.apply(
         MapElements.into(TypeDescriptors.integers())
        
     .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: java.lang.Long =>
             new java.lang.Integer(getShardsNum(count,
   recordsPerShard))
           })
       )
         shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
   
    }
   }
   
   object RecordCountSharding {
     val ShardsNumForEmptyWindows = 0
   
     def apply[T](recordsPerShard:
   Int): RecordCountSharding[T] = {
       if (recordsPerShard <= 0) {
         throw new IllegalArgumentException(s"recordsPerShard
   must be greater than 0! Got $recordsPerShard")
       }
       new RecordCountSharding[T](recordsPerShard)
   
    }
   
     def getShardsNum(count: Long, recordsPerShard: Int): Int = {
       (count.toFloat / recordsPerShard.toFloat).ceil.toInt
   
    }
   }
   
   ```
   
   
   Imported from Jira [BEAM-7955](https://issues.apache.org/jira/browse/BEAM-7955). Original Jira may contain additional context.
   Reported by: mariusz.r.allegro.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org