You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Daniel Collins <dp...@google.com> on 2021/09/09 14:00:36 UTC

Create bug or DirectRunner bug?

Hi all,

I'm running into a weird issue where the following code only runs 50
iterations. I.E. "Created message index {}" is only printed for numbers
1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
1-39. This sounds like a bug in either DirectRunner or Create, does anyone
have any ideas?

-Daniel

int MESSAGE_COUNT = 100;
private static AtomicInteger createdCount = new AtomicInteger();
PCollection<Integer> indexes =
        pipeline.apply(
            "createIndexes",
            Create.of(IntStream.range(0,
MESSAGE_COUNT).boxed().collect(Collectors.toList())));
PCollection<PubSubMessage> messages =
        indexes.apply(
            "createMessages",
            MapElements.via(
                new SimpleFunction<Integer, PubSubMessage>(
                    index -> {
                      System.err.println("Created message index " +
createdCount.incrementAndGet());
                        return Message.builder()

.setData(ByteString.copyFromUtf8(index.toString()))
                            .build()
                            .toProto(); }) {}));
messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
messages.apply(
        "writeMessages",

PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));

Re: Create bug or DirectRunner bug?

Posted by Kenneth Knowles <ke...@apache.org>.
Just want to double check: how are the non-logging aspects looking here? Do
you have 100 (respectively 90) elements in the created collection? How
about the output of the MapElements?

And from your level of familiarity, I assume you are comfortable with the
fact that on other runners there may be multiple VMs, containers, JVMs,
etc, processing arbitrary fractions of the 100 input elements. I guess the
debugging scenario is assuming that the DirectRunner is not doing anything
like this. (FWIW Brian's result sounds like multiple deserializations of
the same DoFn, so the local variable is re-initialized to 0)

Kenn

On Fri, Sep 10, 2021 at 6:11 AM Daniel Collins <dp...@google.com> wrote:

> > Why are you setting isBlockOnRun to false?
>
> The above code is part of an integration test being added in
> https://github.com/apache/beam/pull/15418/files, which polls state to
> determine whether all expected messages sent into Pub/Sub Lite have been
> received. There are two flows in this pipeline: one publishing the
> messages, and the other subscribing to them and producing a streaming
> PCollection whose elements are recorded. The problem is not all messages
> are being published into Pub/Sub Lite since the transform after create (the
> one with the log line mentioned above) is never being called with them.
>
> On Thu, Sep 9, 2021 at 11:36 PM Reuven Lax <re...@google.com> wrote:
>
>> AFAICT that isStreaming setting isn't referenced anywhere in DirectRunner.
>>
>> Why are you setting isBlockOnRun to false?
>>
>> On Thu, Sep 9, 2021 at 8:19 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> > In your example, is createdCount a private static member of a class
>>> that contains the function for defining the pipeline?
>>>
>>> Yes, sorry that that was unclear. This is a private static member of the
>>> class. https://issues.apache.org/jira/browse/BEAM-12867 should provide
>>> code making this more clear.
>>>
>>> I think this may have to do with the streaming setting? This is how my
>>> pipeline is initialized:
>>>
>>> @Rule public transient TestPipeline pipeline = TestPipeline.create();
>>>
>>> @Test
>>>   public void testReadWrite() throws Exception {
>>>     pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
>>>
>>> pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
>>>     applyTransform(pipeline);
>>>     pipeline.run():
>>>     /* sleep loop */
>>> }
>>>
>>>
>>>
>>> On Thu, Sep 9, 2021 at 5:57 PM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> In your example, is createdCount a private static member of a class
>>>> that contains the function for defining the pipeline? I don't think the
>>>> code is valid as written.
>>>>
>>>> I tried to reproduce what you're seeing, but I'm not able to. If I make
>>>> the AtomicInteger a private static member of the class as described above,
>>>> it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
>>>> If I make the AtomicInteger a (non-static) variable in the method I do
>>>> see surprising output: numbers from 0-17, with many duplicates.
>>>>
>>>> Brian
>>>>
>>>>
>>>> On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> This variable is static and an atomic so it is safe to modify in a
>>>>> transform from DirectRunner, and is unrelated to the issue I'm seeing here.
>>>>>
>>>>> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Mutating variables like createdCount that are defined outside of the
>>>>>> Beam pipeline from within a DoFn is unsafe. In this case you could use
>>>>>> Beam's built in Count transform instead.
>>>>>>
>>>>>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm running into a weird issue where the following code only runs 50
>>>>>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>>>>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>>>>>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>>>>>>> have any ideas?
>>>>>>>
>>>>>>> -Daniel
>>>>>>>
>>>>>>> int MESSAGE_COUNT = 100;
>>>>>>> private static AtomicInteger createdCount = new AtomicInteger();
>>>>>>> PCollection<Integer> indexes =
>>>>>>>         pipeline.apply(
>>>>>>>             "createIndexes",
>>>>>>>             Create.of(IntStream.range(0,
>>>>>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>>>>>> PCollection<PubSubMessage> messages =
>>>>>>>         indexes.apply(
>>>>>>>             "createMessages",
>>>>>>>             MapElements.via(
>>>>>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>>>>>                     index -> {
>>>>>>>                       System.err.println("Created message index " +
>>>>>>> createdCount.incrementAndGet());
>>>>>>>                         return Message.builder()
>>>>>>>
>>>>>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>>>>>                             .build()
>>>>>>>                             .toProto(); }) {}));
>>>>>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>>>>>> messages.apply(
>>>>>>>         "writeMessages",
>>>>>>>
>>>>>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>>>>>
>>>>>>

Re: Create bug or DirectRunner bug?

Posted by Daniel Collins <dp...@google.com>.
> Why are you setting isBlockOnRun to false?

The above code is part of an integration test being added in
https://github.com/apache/beam/pull/15418/files, which polls state to
determine whether all expected messages sent into Pub/Sub Lite have been
received. There are two flows in this pipeline: one publishing the
messages, and the other subscribing to them and producing a streaming
PCollection whose elements are recorded. The problem is not all messages
are being published into Pub/Sub Lite since the transform after create (the
one with the log line mentioned above) is never being called with them.

On Thu, Sep 9, 2021 at 11:36 PM Reuven Lax <re...@google.com> wrote:

> AFAICT that isStreaming setting isn't referenced anywhere in DirectRunner.
>
> Why are you setting isBlockOnRun to false?
>
> On Thu, Sep 9, 2021 at 8:19 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> > In your example, is createdCount a private static member of a class
>> that contains the function for defining the pipeline?
>>
>> Yes, sorry that that was unclear. This is a private static member of the
>> class. https://issues.apache.org/jira/browse/BEAM-12867 should provide
>> code making this more clear.
>>
>> I think this may have to do with the streaming setting? This is how my
>> pipeline is initialized:
>>
>> @Rule public transient TestPipeline pipeline = TestPipeline.create();
>>
>> @Test
>>   public void testReadWrite() throws Exception {
>>     pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
>>
>> pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
>>     applyTransform(pipeline);
>>     pipeline.run():
>>     /* sleep loop */
>> }
>>
>>
>>
>> On Thu, Sep 9, 2021 at 5:57 PM Brian Hulette <bh...@google.com> wrote:
>>
>>> In your example, is createdCount a private static member of a class that
>>> contains the function for defining the pipeline? I don't think the code is
>>> valid as written.
>>>
>>> I tried to reproduce what you're seeing, but I'm not able to. If I make
>>> the AtomicInteger a private static member of the class as described above,
>>> it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
>>> If I make the AtomicInteger a (non-static) variable in the method I do
>>> see surprising output: numbers from 0-17, with many duplicates.
>>>
>>> Brian
>>>
>>>
>>> On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> This variable is static and an atomic so it is safe to modify in a
>>>> transform from DirectRunner, and is unrelated to the issue I'm seeing here.
>>>>
>>>> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> Mutating variables like createdCount that are defined outside of the
>>>>> Beam pipeline from within a DoFn is unsafe. In this case you could use
>>>>> Beam's built in Count transform instead.
>>>>>
>>>>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm running into a weird issue where the following code only runs 50
>>>>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>>>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>>>>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>>>>>> have any ideas?
>>>>>>
>>>>>> -Daniel
>>>>>>
>>>>>> int MESSAGE_COUNT = 100;
>>>>>> private static AtomicInteger createdCount = new AtomicInteger();
>>>>>> PCollection<Integer> indexes =
>>>>>>         pipeline.apply(
>>>>>>             "createIndexes",
>>>>>>             Create.of(IntStream.range(0,
>>>>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>>>>> PCollection<PubSubMessage> messages =
>>>>>>         indexes.apply(
>>>>>>             "createMessages",
>>>>>>             MapElements.via(
>>>>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>>>>                     index -> {
>>>>>>                       System.err.println("Created message index " +
>>>>>> createdCount.incrementAndGet());
>>>>>>                         return Message.builder()
>>>>>>
>>>>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>>>>                             .build()
>>>>>>                             .toProto(); }) {}));
>>>>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>>>>> messages.apply(
>>>>>>         "writeMessages",
>>>>>>
>>>>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>>>>
>>>>>

Re: Create bug or DirectRunner bug?

Posted by Reuven Lax <re...@google.com>.
AFAICT that isStreaming setting isn't referenced anywhere in DirectRunner.

Why are you setting isBlockOnRun to false?

On Thu, Sep 9, 2021 at 8:19 PM Daniel Collins <dp...@google.com> wrote:

> > In your example, is createdCount a private static member of a class that
> contains the function for defining the pipeline?
>
> Yes, sorry that that was unclear. This is a private static member of the
> class. https://issues.apache.org/jira/browse/BEAM-12867 should provide
> code making this more clear.
>
> I think this may have to do with the streaming setting? This is how my
> pipeline is initialized:
>
> @Rule public transient TestPipeline pipeline = TestPipeline.create();
>
> @Test
>   public void testReadWrite() throws Exception {
>     pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
>
> pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
>     applyTransform(pipeline);
>     pipeline.run():
>     /* sleep loop */
> }
>
>
>
> On Thu, Sep 9, 2021 at 5:57 PM Brian Hulette <bh...@google.com> wrote:
>
>> In your example, is createdCount a private static member of a class that
>> contains the function for defining the pipeline? I don't think the code is
>> valid as written.
>>
>> I tried to reproduce what you're seeing, but I'm not able to. If I make
>> the AtomicInteger a private static member of the class as described above,
>> it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
>> If I make the AtomicInteger a (non-static) variable in the method I do
>> see surprising output: numbers from 0-17, with many duplicates.
>>
>> Brian
>>
>>
>> On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> This variable is static and an atomic so it is safe to modify in a
>>> transform from DirectRunner, and is unrelated to the issue I'm seeing here.
>>>
>>> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> Mutating variables like createdCount that are defined outside of the
>>>> Beam pipeline from within a DoFn is unsafe. In this case you could use
>>>> Beam's built in Count transform instead.
>>>>
>>>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm running into a weird issue where the following code only runs 50
>>>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>>>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>>>>> have any ideas?
>>>>>
>>>>> -Daniel
>>>>>
>>>>> int MESSAGE_COUNT = 100;
>>>>> private static AtomicInteger createdCount = new AtomicInteger();
>>>>> PCollection<Integer> indexes =
>>>>>         pipeline.apply(
>>>>>             "createIndexes",
>>>>>             Create.of(IntStream.range(0,
>>>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>>>> PCollection<PubSubMessage> messages =
>>>>>         indexes.apply(
>>>>>             "createMessages",
>>>>>             MapElements.via(
>>>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>>>                     index -> {
>>>>>                       System.err.println("Created message index " +
>>>>> createdCount.incrementAndGet());
>>>>>                         return Message.builder()
>>>>>
>>>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>>>                             .build()
>>>>>                             .toProto(); }) {}));
>>>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>>>> messages.apply(
>>>>>         "writeMessages",
>>>>>
>>>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>>>
>>>>

Re: Create bug or DirectRunner bug?

Posted by Daniel Collins <dp...@google.com>.
> In your example, is createdCount a private static member of a class that
contains the function for defining the pipeline?

Yes, sorry that that was unclear. This is a private static member of the
class. https://issues.apache.org/jira/browse/BEAM-12867 should provide code
making this more clear.

I think this may have to do with the streaming setting? This is how my
pipeline is initialized:

@Rule public transient TestPipeline pipeline = TestPipeline.create();

@Test
  public void testReadWrite() throws Exception {
    pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);

pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
    applyTransform(pipeline);
    pipeline.run():
    /* sleep loop */
}



On Thu, Sep 9, 2021 at 5:57 PM Brian Hulette <bh...@google.com> wrote:

> In your example, is createdCount a private static member of a class that
> contains the function for defining the pipeline? I don't think the code is
> valid as written.
>
> I tried to reproduce what you're seeing, but I'm not able to. If I make
> the AtomicInteger a private static member of the class as described above,
> it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
> If I make the AtomicInteger a (non-static) variable in the method I do see
> surprising output: numbers from 0-17, with many duplicates.
>
> Brian
>
>
> On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <dp...@google.com>
> wrote:
>
>> This variable is static and an atomic so it is safe to modify in a
>> transform from DirectRunner, and is unrelated to the issue I'm seeing here.
>>
>> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com> wrote:
>>
>>> Mutating variables like createdCount that are defined outside of the
>>> Beam pipeline from within a DoFn is unsafe. In this case you could use
>>> Beam's built in Count transform instead.
>>>
>>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm running into a weird issue where the following code only runs 50
>>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>>>> have any ideas?
>>>>
>>>> -Daniel
>>>>
>>>> int MESSAGE_COUNT = 100;
>>>> private static AtomicInteger createdCount = new AtomicInteger();
>>>> PCollection<Integer> indexes =
>>>>         pipeline.apply(
>>>>             "createIndexes",
>>>>             Create.of(IntStream.range(0,
>>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>>> PCollection<PubSubMessage> messages =
>>>>         indexes.apply(
>>>>             "createMessages",
>>>>             MapElements.via(
>>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>>                     index -> {
>>>>                       System.err.println("Created message index " +
>>>> createdCount.incrementAndGet());
>>>>                         return Message.builder()
>>>>
>>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>>                             .build()
>>>>                             .toProto(); }) {}));
>>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>>> messages.apply(
>>>>         "writeMessages",
>>>>
>>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>>
>>>

Re: Create bug or DirectRunner bug?

Posted by Brian Hulette <bh...@google.com>.
In your example, is createdCount a private static member of a class that
contains the function for defining the pipeline? I don't think the code is
valid as written.

I tried to reproduce what you're seeing, but I'm not able to. If I make
the AtomicInteger a private static member of the class as described above,
it works as I'd expect, logging 0-99 (with COUNT=100), mostly in order.
If I make the AtomicInteger a (non-static) variable in the method I do see
surprising output: numbers from 0-17, with many duplicates.

Brian


On Thu, Sep 9, 2021 at 9:05 AM Daniel Collins <dp...@google.com> wrote:

> This variable is static and an atomic so it is safe to modify in a
> transform from DirectRunner, and is unrelated to the issue I'm seeing here.
>
> On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com> wrote:
>
>> Mutating variables like createdCount that are defined outside of the Beam
>> pipeline from within a DoFn is unsafe. In this case you could use Beam's
>> built in Count transform instead.
>>
>> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm running into a weird issue where the following code only runs 50
>>> iterations. I.E. "Created message index {}" is only printed for numbers
>>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>>> have any ideas?
>>>
>>> -Daniel
>>>
>>> int MESSAGE_COUNT = 100;
>>> private static AtomicInteger createdCount = new AtomicInteger();
>>> PCollection<Integer> indexes =
>>>         pipeline.apply(
>>>             "createIndexes",
>>>             Create.of(IntStream.range(0,
>>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>>> PCollection<PubSubMessage> messages =
>>>         indexes.apply(
>>>             "createMessages",
>>>             MapElements.via(
>>>                 new SimpleFunction<Integer, PubSubMessage>(
>>>                     index -> {
>>>                       System.err.println("Created message index " +
>>> createdCount.incrementAndGet());
>>>                         return Message.builder()
>>>
>>> .setData(ByteString.copyFromUtf8(index.toString()))
>>>                             .build()
>>>                             .toProto(); }) {}));
>>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>>> messages.apply(
>>>         "writeMessages",
>>>
>>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>>
>>

Re: Create bug or DirectRunner bug?

Posted by Daniel Collins <dp...@google.com>.
This variable is static and an atomic so it is safe to modify in a
transform from DirectRunner, and is unrelated to the issue I'm seeing here.

On Thu, Sep 9, 2021 at 11:46 AM Kyle Weaver <kc...@google.com> wrote:

> Mutating variables like createdCount that are defined outside of the Beam
> pipeline from within a DoFn is unsafe. In this case you could use Beam's
> built in Count transform instead.
>
> On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com>
> wrote:
>
>> Hi all,
>>
>> I'm running into a weird issue where the following code only runs 50
>> iterations. I.E. "Created message index {}" is only printed for numbers
>> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
>> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
>> have any ideas?
>>
>> -Daniel
>>
>> int MESSAGE_COUNT = 100;
>> private static AtomicInteger createdCount = new AtomicInteger();
>> PCollection<Integer> indexes =
>>         pipeline.apply(
>>             "createIndexes",
>>             Create.of(IntStream.range(0,
>> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
>> PCollection<PubSubMessage> messages =
>>         indexes.apply(
>>             "createMessages",
>>             MapElements.via(
>>                 new SimpleFunction<Integer, PubSubMessage>(
>>                     index -> {
>>                       System.err.println("Created message index " +
>> createdCount.incrementAndGet());
>>                         return Message.builder()
>>
>> .setData(ByteString.copyFromUtf8(index.toString()))
>>                             .build()
>>                             .toProto(); }) {}));
>> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
>> messages.apply(
>>         "writeMessages",
>>
>> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>>
>

Re: Create bug or DirectRunner bug?

Posted by Kyle Weaver <kc...@google.com>.
Mutating variables like createdCount that are defined outside of the Beam
pipeline from within a DoFn is unsafe. In this case you could use Beam's
built in Count transform instead.

On Thu, Sep 9, 2021 at 7:00 AM Daniel Collins <dp...@google.com> wrote:

> Hi all,
>
> I'm running into a weird issue where the following code only runs 50
> iterations. I.E. "Created message index {}" is only printed for numbers
> 1-50. When changing MESSAGE_COUNT to 90, it is only printed for numbers
> 1-39. This sounds like a bug in either DirectRunner or Create, does anyone
> have any ideas?
>
> -Daniel
>
> int MESSAGE_COUNT = 100;
> private static AtomicInteger createdCount = new AtomicInteger();
> PCollection<Integer> indexes =
>         pipeline.apply(
>             "createIndexes",
>             Create.of(IntStream.range(0,
> MESSAGE_COUNT).boxed().collect(Collectors.toList())));
> PCollection<PubSubMessage> messages =
>         indexes.apply(
>             "createMessages",
>             MapElements.via(
>                 new SimpleFunction<Integer, PubSubMessage>(
>                     index -> {
>                       System.err.println("Created message index " +
> createdCount.incrementAndGet());
>                         return Message.builder()
>
> .setData(ByteString.copyFromUtf8(index.toString()))
>                             .build()
>                             .toProto(); }) {}));
> messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
> messages.apply(
>         "writeMessages",
>
> PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
>