You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Dmitrii Dimandt <dm...@dmitriid.com> on 2018/11/28 13:31:04 UTC

Update to protobuf results in failing MapElements step

Hi all.

This could be a strictly Google Dataflow question, but here goes:

I'm currently trying to update a statefull streaming job which basically looks as follows (using 2.7/2.8 versions of the sdks):

    pipeline
        .apply("Read from pubsub", ...read from pubsub...)
        .apply(
            "Map PubSubMessages",
            MapElements
                .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(PubSubMessage.class)))
                .via(input -> KV.of(input.getId1() + ":" + input.getId2(), input))
        )
        .apply(...stateful processing...)

PubSubMessage is decoded/encoded using the autogenerated class:

    registry.registerCoderForClass(PubSubMessage.class, ProtoCoder.of(PubSubMessage.class));

The problem I'm having now is that the message has been updated. PubSubMessage now has two additional fields. As it's proto3, it's backwards compatible. However, since the generated class changed, it looks like Dataflow thinks that the type has changed, too, and the old messages/steps are no longer compatible, and fails to update the job with the following:

Workflow failed. Causes: The new job is not compatible with existing job. The original job has not been aborted., The Coder or type for step Map PubSubMessages to state object/Map.out0/FromValue has changed.
It would be somewhat a major-ish pain to stop the job (draining its messages) and restarting it anew. Is there a better way to update the type/protobuf/coder in this case?

Currently we think this could very well be the result of MapElements.via with a lambda function:

According to Beam guidelines:

--- start quote ---
Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.
--- end quote ---

To quote a colleague: So the lambda itself perhaps being at fault. The wording "The Coder or type for step" just makes me suspicious. If you click the mapping step in the working pipeline, then it has some metadata on the side `com.storytel.consumption.aggregation.Program$$Lambda$30/20156341` I really don't trust such information to be stable between builds, especially when the types change. But I could be "barking up the wrong tree".

If anyone has an insight, or a suggestion, or a hint how to resolve this problem, it would be greatly appreciated.