You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aviem Zur (JIRA)" <ji...@apache.org> on 2017/01/19 13:07:26 UTC
[jira] [Resolved] (BEAM-1155) Spark runner aggregators only support
a handfuls of combiners
[ https://issues.apache.org/jira/browse/BEAM-1155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aviem Zur resolved BEAM-1155.
-----------------------------
Resolution: Won't Fix
Fix Version/s: Not applicable
> Spark runner aggregators only support a handfuls of combiners
> -------------------------------------------------------------
>
> Key: BEAM-1155
> URL: https://issues.apache.org/jira/browse/BEAM-1155
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Aviem Zur
> Assignee: Amit Sela
> Fix For: Not applicable
>
>
> Spark runner aggregators only support a handfuls of combiners.
> If your {{CombineFn}} implementation (specifically, a custom {{CombineFn}} written by the user for their aggregator) is not one that appears in {{org.apache.beam.runners.spark.translation.SparkRuntimeContext#getCoder}} you will get an {{IllegalArgumentException}} in your pipeline.
> {code:java}
> private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
> try {
> if (combiner.getClass() == Sum.SumIntegerFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
> } else if (combiner.getClass() == Sum.SumLongFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
> } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
> } else if (combiner.getClass() == Min.MinIntegerFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
> } else if (combiner.getClass() == Min.MinLongFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
> } else if (combiner.getClass() == Min.MinDoubleFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
> } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
> } else if (combiner.getClass() == Max.MaxLongFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
> } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
> return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
> } else {
> throw new IllegalArgumentException("unsupported combiner in Aggregator: "
> + combiner.getClass().getName());
> }
> } catch (CannotProvideCoderException e) {
> throw new IllegalStateException("Could not determine default coder for combiner", e);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)