You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2017/11/16 19:51:00 UTC

[jira] [Created] (BEAM-3203) Spec out what mutations are allowed to a constructed model pipeline, particularly coders

Kenneth Knowles created BEAM-3203:
-------------------------------------

             Summary: Spec out what mutations are allowed to a constructed model pipeline, particularly coders
                 Key: BEAM-3203
                 URL: https://issues.apache.org/jira/browse/BEAM-3203
             Project: Beam
          Issue Type: Improvement
          Components: beam-model
            Reporter: Kenneth Knowles
            Assignee: Kenneth Knowles


Context: presume an SDK has constructed a pipeline or sub-pipeline, and sent it - as a model proto - to another party, which could be a runner or another SDK.

Question to be resolved: What mutations are allowed to this pipeline?

For example, depending on how an SDK harness is implemented, some coders (aka wire formats) can be swapped while leaving the language-level types compatible. For example, "urn:beam:coder:varlong" and "urn:beam:coder:bigendianlong". It may also be possible to add or remove added length prefixes in some situations.

What we mean by _coder_ is a wire format specification for a _stream_ of elements, specified by a {{FunctionSpec}} proto and its components coders (and so on recursively).

For many coders, if the encoding is not known to a party, then the boundaries of elements cannot be discerned. But there are lots of situations where boundaries need to be known without full decoding - particularly by runners, but also at some point for SDK-to-SDK transmission.

*Possibility 1*: insist that a coder...

{code}
Coder {
  spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" }
}
{code}

... is always allowed to be replaced by the same coder, wrapped with an added lengh prefix ...

{code}
Coder {
  spec: FunctionSpec { urn: "beam:coder:add_length_prefix" }
  component_coders: [
    Coder {
      spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" }
    }
  ]
}
{code}

There is a responsibility that each SDK harness understand this coder and also be able to execute the same UDFs with the decoded values. This is already sort of implicit in how the Fn API produces ProcessBundleDescriptors, since a runner can never assume to understand SDK coders.

*Posibility 2*: allow optimization by indicating a way to determine element boundaries

It may be that even for a coder that cannot be understood, the element boundaries can be easily discerned. For example, if a coder _already_ puts a length prefix in a known format at the start of each element, you just need to pull that out. This means that for an unknown coder, you can save the computation and space of adding a length prefix. (if you can understand "urn:beam:coder:add_length_prefix" then that special case is already handled)

It might look something like this:

{code}
Coder {
  spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" }
  also_decodes_as: Coder {
    spec: FunctionSpec { urn: "beam:coder:add_length_prefix" }
    component_coders: [
      Coder: { urn: "beam:coder:uninterpretable_bytes" }
    ]
  }
}
{code}

The extra coder in {{also_decodes_as}} must be completely wire-compatible and should always be compose of completely standardized coders, so element boundaries can always be ascertained. An annoyance here is the possibility for silly protos where this recurses. Since the main implementation we expect is a length prefix, it could just be a flag, or just a coder for the length prefix itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)