You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/03/21 15:56:09 UTC

[beam] branch master updated: [BEAM-5638] Exception handling for Java MapElements and FlatMapElements

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b12dafd  [BEAM-5638] Exception handling for Java MapElements and FlatMapElements
     new 261b401  Merge pull request #7736: [BEAM-5638] Exception handling for Java MapElements and FlatMapElements
b12dafd is described below

commit b12dafd52382b131794e64260080b2ac5e303d3e
Author: Jeff Klukas <je...@klukas.net>
AuthorDate: Fri Feb 1 12:28:48 2019 -0500

    [BEAM-5638] Exception handling for Java MapElements and FlatMapElements
    
    Previously attempted in #6586, this incorporates feedback from the previous
    review and provides some additional convenience. The most important change
    is that the interface now expects users to provide an exception handler
    function that outputs elements that are efficiently codable as originally
    suggested by @tims.
    
    Common features have been implemented in a new `WithExceptions` class,
    and the top-level documentation there is the best place to look for an overview
    of the approach here.
    
    This PR does not address Filter, AsJsons, and ParseJsons. Those could be added
    in a future PR.
---
 .../apache/beam/sdk/annotations/Experimental.java  |   3 +
 .../beam/sdk/transforms/FlatMapElements.java       | 194 +++++++++++++++++++++
 .../apache/beam/sdk/transforms/MapElements.java    | 190 ++++++++++++++++++++
 .../apache/beam/sdk/transforms/WithFailures.java   | 182 +++++++++++++++++++
 .../beam/sdk/transforms/FlatMapElementsTest.java   | 105 +++++++++++
 .../beam/sdk/transforms/MapElementsTest.java       | 122 +++++++++++++
 .../beam/sdk/transforms/WithFailuresTest.java      |  81 +++++++++
 7 files changed, 877 insertions(+)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 77dedb3..25e0a65 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -106,5 +106,8 @@ public @interface Experimental {
 
     /** PCollection Schema support in Beam. */
     SCHEMAS,
+
+    /** Experimental APIs related to exception handling in PTransforms. */
+    WITH_EXCEPTIONS,
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 77b17e4..286e8ab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -23,9 +23,13 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 
@@ -182,4 +186,194 @@ public class FlatMapElements<InputT, OutputT>
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
+   * mapping elements, with the given type descriptor used for the failure collection but the
+   * exception handler yet to be specified using {@link
+   * FlatMapWithFailures#exceptionsVia(ProcessFunction)}.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
+   * WithFailures.Result}.
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public <NewFailureT> FlatMapWithFailures<InputT, OutputT, NewFailureT> exceptionsInto(
+      TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+    return new FlatMapWithFailures<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, failureTypeDescriptor);
+  }
+
+  /**
+   * Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
+   * mapping elements, passing the raised exception instance and the input element being processed
+   * through the given {@code exceptionHandler} and emitting the result to a failure collection.
+   *
+   * <p>This method takes advantage of the type information provided by {@link InferableFunction},
+   * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<String>, String>> result = words.apply(
+   *     FlatMapElements
+   *         .into(TypeDescriptors.strings())
+   *         // Could throw ArrayIndexOutOfBoundsException
+   *         .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+   *         .exceptionsVia(new WithFailures.ExceptionAsMapHandler<String>() {}));
+   * PCollection<String> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public <FailureT> FlatMapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
+      InferableFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+    return new FlatMapWithFailures<>(
+        fn,
+        originalFnForDisplayData,
+        inputType,
+        outputType,
+        exceptionHandler,
+        exceptionHandler.getOutputTypeDescriptor());
+  }
+
+  /** A {@code PTransform} that adds exception handling to {@link FlatMapElements}. */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public static class FlatMapWithFailures<InputT, OutputT, FailureT>
+      extends PTransform<PCollection<InputT>, WithFailures.Result<PCollection<OutputT>, FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler;
+
+    FlatMapWithFailures(
+        @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
+     * mapping elements, passing the raised exception instance and the input element being processed
+     * through the given {@code exceptionHandler} and emitting the result to a failure collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<String>, String>> result = words.apply(
+     *     FlatMapElements
+     *         .into(TypeDescriptors.strings())
+     *         // Could throw ArrayIndexOutOfBoundsException
+     *         .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
+     *         .exceptionsInto(TypeDescriptors.strings())
+     *         .exceptionsVia((ExceptionElement<String> ee) -> ee.exception().getMessage()));
+     * PCollection<String> output = result.output();
+     * PCollection<String> failures = result.failures();
+     * }</pre>
+     */
+    public FlatMapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
+        ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+      return new FlatMapWithFailures<>(
+          fn, originalFnForDisplayData, inputType, outputType, exceptionHandler, failureType);
+    }
+
+    @Override
+    public WithFailures.Result<PCollection<OutputT>, FailureT> expand(PCollection<InputT> input) {
+      checkArgument(exceptionHandler != null, ".exceptionsVia() is required");
+      MapFn doFn = new MapFn();
+      PCollectionTuple tuple =
+          input.apply(
+              FlatMapWithFailures.class.getSimpleName(),
+              ParDo.of(doFn)
+                  .withOutputTags(doFn.outputTag, TupleTagList.of(doFn.failureTag))
+                  .withSideInputs(this.fn.getRequirements().getSideInputs()));
+      return WithFailures.Result.of(tuple, doFn.outputTag, doFn.failureTag);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
+      if (originalFnForDisplayData instanceof HasDisplayData) {
+        builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+      }
+      builder.add(DisplayData.item("exceptionHandler.class", exceptionHandler.getClass()));
+      if (exceptionHandler instanceof HasDisplayData) {
+        builder.include("exceptionHandler", (HasDisplayData) exceptionHandler);
+      }
+    }
+
+    /** A concrete TupleTag that allows coder inference based on failureType. */
+    private class FailureTag extends TupleTag<FailureT> {
+      @Override
+      public TypeDescriptor<FailureT> getTypeDescriptor() {
+        return failureType;
+      }
+    }
+
+    /** A DoFn implementation that handles exceptions and outputs a secondary failure collection. */
+    private class MapFn extends DoFn<InputT, OutputT> {
+
+      final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+      final TupleTag<FailureT> failureTag = new FailureTag();
+
+      @ProcessElement
+      public void processElement(@Element InputT element, MultiOutputReceiver r, ProcessContext c)
+          throws Exception {
+        boolean exceptionWasThrown = false;
+        Iterable<OutputT> res = null;
+        try {
+          res = fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c));
+        } catch (Exception e) {
+          exceptionWasThrown = true;
+          ExceptionElement<InputT> exceptionElement = ExceptionElement.of(element, e);
+          r.get(failureTag).output(exceptionHandler.apply(exceptionElement));
+        }
+        // We make sure our outputs occur outside the try block, since runners may implement
+        // fusion by having output() directly call the body of another DoFn, potentially catching
+        // exceptions unrelated to this transform.
+        if (!exceptionWasThrown) {
+          for (OutputT output : res) {
+            r.get(outputTag).output(output);
+          }
+        }
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.delegate(FlatMapWithFailures.this);
+      }
+
+      @Override
+      public TypeDescriptor<InputT> getInputTypeDescriptor() {
+        return inputType;
+      }
+
+      @Override
+      public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+        checkState(
+            outputType != null,
+            "%s output type descriptor was null; "
+                + "this probably means that getOutputTypeDescriptor() was called after "
+                + "serialization/deserialization, but it is only available prior to "
+                + "serialization, for constructing a pipeline and inferring coders",
+            FlatMapWithFailures.class.getSimpleName());
+        return outputType;
+      }
+    }
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index a9cd2b6..09fef37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
@@ -24,9 +25,13 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 
@@ -168,4 +173,189 @@ public class MapElements<InputT, OutputT>
       builder.include("fn", (HasDisplayData) originalFnForDisplayData);
     }
   }
+
+  /**
+   * Returns a new {@link MapWithFailures} transform that catches exceptions raised while mapping
+   * elements, with the given type descriptor used for the failure collection but the exception
+   * handler yet to be specified using {@link MapWithFailures#exceptionsVia(ProcessFunction)}.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
+   * WithFailures.Result}.
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public <NewFailureT> MapWithFailures<InputT, OutputT, NewFailureT> exceptionsInto(
+      TypeDescriptor<NewFailureT> failureTypeDescriptor) {
+    return new MapWithFailures<>(
+        fn, originalFnForDisplayData, inputType, outputType, null, failureTypeDescriptor);
+  }
+
+  /**
+   * Returns a new {@link MapWithFailures} transform that catches exceptions raised while mapping
+   * elements, passing the raised exception instance and the input element being processed through
+   * the given {@code exceptionHandler} and emitting the result to a failure collection.
+   *
+   * <p>This method takes advantage of the type information provided by {@link InferableFunction},
+   * meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
+   *
+   * <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
+   * WithFailures.Result}.
+   *
+   * <p>Example usage:
+   *
+   * <pre>{@code
+   * Result<PCollection<String>, String>> result = words.apply(
+   *     MapElements
+   *         .into(TypeDescriptors.integers())
+   *         .via((String word) -> 1 / word.length)  // Could throw ArithmeticException
+   *         .exceptionsVia(new WithFailures.ExceptionAsMapHandler<String>() {}));
+   * PCollection<Integer> output = result.output();
+   * PCollection<String> failures = result.failures();
+   * }</pre>
+   */
+  @Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+  public <FailureT> MapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
+      InferableFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+    return new MapWithFailures<>(
+        fn,
+        originalFnForDisplayData,
+        inputType,
+        outputType,
+        exceptionHandler,
+        exceptionHandler.getOutputTypeDescriptor());
+  }
+
+  /** A {@code PTransform} that adds exception handling to {@link MapElements}. */
+  @Experimental(Kind.WITH_EXCEPTIONS)
+  public static class MapWithFailures<InputT, OutputT, FailureT>
+      extends PTransform<PCollection<InputT>, WithFailures.Result<PCollection<OutputT>, FailureT>> {
+
+    private final transient TypeDescriptor<InputT> inputType;
+    private final transient TypeDescriptor<OutputT> outputType;
+    @Nullable private final transient TypeDescriptor<FailureT> failureType;
+    private final transient Object originalFnForDisplayData;
+    private final Contextful<Fn<InputT, OutputT>> fn;
+    @Nullable private final ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler;
+
+    MapWithFailures(
+        Contextful<Fn<InputT, OutputT>> fn,
+        Object originalFnForDisplayData,
+        TypeDescriptor<InputT> inputType,
+        TypeDescriptor<OutputT> outputType,
+        @Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler,
+        @Nullable TypeDescriptor<FailureT> failureType) {
+      this.fn = fn;
+      this.originalFnForDisplayData = originalFnForDisplayData;
+      this.inputType = inputType;
+      this.outputType = outputType;
+      this.exceptionHandler = exceptionHandler;
+      this.failureType = failureType;
+    }
+
+    /**
+     * Returns a {@code PTransform} that catches exceptions raised while mapping elements, passing
+     * the raised exception instance and the input element being processed through the given {@code
+     * exceptionHandler} and emitting the result to a failure collection.
+     *
+     * <p>Example usage:
+     *
+     * <pre>{@code
+     * Result<PCollection<Integer>, String> result = words.apply(
+     *     MapElements
+     *         .into(TypeDescriptors.integers())
+     *         .via((String word) -> 1 / word.length())  // Could throw ArithmeticException
+     *         .exceptionsInto(TypeDescriptors.strings())
+     *         .exceptionsVia((ExceptionElement<String> ee) -> ee.exception().getMessage()));
+     * PCollection<Integer> output = result.output();
+     * PCollection<String> failures = result.failures();
+     * }</pre>
+     */
+    public MapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
+        ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
+      return new MapWithFailures<>(
+          fn, originalFnForDisplayData, inputType, outputType, exceptionHandler, failureType);
+    }
+
+    @Override
+    public WithFailures.Result<PCollection<OutputT>, FailureT> expand(PCollection<InputT> input) {
+      checkArgument(exceptionHandler != null, ".exceptionsVia() is required");
+      MapFn doFn = new MapFn();
+      PCollectionTuple tuple =
+          input.apply(
+              MapWithFailures.class.getSimpleName(),
+              ParDo.of(doFn)
+                  .withOutputTags(doFn.outputTag, TupleTagList.of(doFn.failureTag))
+                  .withSideInputs(this.fn.getRequirements().getSideInputs()));
+      return WithFailures.Result.of(tuple, doFn.outputTag, doFn.failureTag);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
+      if (originalFnForDisplayData instanceof HasDisplayData) {
+        builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+      }
+      builder.add(DisplayData.item("exceptionHandler.class", exceptionHandler.getClass()));
+      if (exceptionHandler instanceof HasDisplayData) {
+        builder.include("exceptionHandler", (HasDisplayData) exceptionHandler);
+      }
+    }
+
+    /** A concrete TupleTag that allows coder inference based on failureType. */
+    private class FailureTag extends TupleTag<FailureT> {
+      @Override
+      public TypeDescriptor<FailureT> getTypeDescriptor() {
+        return failureType;
+      }
+    }
+
+    /** A DoFn implementation that handles exceptions and outputs a secondary failure collection. */
+    private class MapFn extends DoFn<InputT, OutputT> {
+
+      final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
+      final TupleTag<FailureT> failureTag = new FailureTag();
+
+      @ProcessElement
+      public void processElement(@Element InputT element, MultiOutputReceiver r, ProcessContext c)
+          throws Exception {
+        boolean exceptionWasThrown = false;
+        OutputT result = null;
+        try {
+          result = fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c));
+        } catch (Exception e) {
+          exceptionWasThrown = true;
+          ExceptionElement<InputT> exceptionElement = ExceptionElement.of(element, e);
+          r.get(failureTag).output(exceptionHandler.apply(exceptionElement));
+        }
+        // We make sure our output occurs outside the try block, since runners may implement
+        // fusion by having output() directly call the body of another DoFn, potentially catching
+        // exceptions unrelated to this transform.
+        if (!exceptionWasThrown) {
+          r.get(outputTag).output(result);
+        }
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.delegate(MapWithFailures.this);
+      }
+
+      @Override
+      public TypeDescriptor<InputT> getInputTypeDescriptor() {
+        return inputType;
+      }
+
+      @Override
+      public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+        checkState(
+            outputType != null,
+            "%s output type descriptor was null; "
+                + "this probably means that getOutputTypeDescriptor() was called after "
+                + "serialization/deserialization, but it is only available prior to "
+                + "serialization, for constructing a pipeline and inferring coders",
+            MapWithFailures.class.getSimpleName());
+        return outputType;
+      }
+    }
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
new file mode 100644
index 0000000..357b1d4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+/**
+ * A collection of utilities for writing transforms that can handle exceptions raised during
+ * processing of elements.
+ *
+ * <p>Consuming transforms such as {@link MapElements.MapWithFailures} follow the general pattern of
+ * taking in a user-defined exception handler of type {@code
+ * ProcessFunction<ExceptionElement<InputT>, FailureOutputT>} where the input {@link
+ * ExceptionElement} contains an exception along with the input element that was being processed
+ * when the exception was raised. This handler is responsible for producing some output element that
+ * captures relevant details of the failure and can be encoded as part of a failure output {@link
+ * PCollection}. Transforms can then package together their output and failure collections in a
+ * {@link WithFailures.Result} that avoids users needing to interact with {@code TupleTag}s and
+ * indexing into a {@link PCollectionTuple}.
+ *
+ * <p>Exception handlers can narrow their scope by rethrowing the passed {@link
+ * ExceptionElement#exception()} and catching only specific subclasses of {@code Exception}.
+ * Unhandled exceptions will generally bubble up to a top-level {@link
+ * org.apache.beam.sdk.Pipeline.PipelineExecutionException} that halts progress.
+ *
+ * <p>Users can take advantage of {@link Result#failuresTo(List)} for fluent chaining of transforms
+ * that handle exceptions:
+ *
+ * <pre>{@code
+ * PCollection<Integer> input = ...
+ * List<PCollection<Map<String, String>> failureCollections = new ArrayList<>();
+ * input.apply(MapElements.via(...).exceptionsVia(...))
+ *      .failuresTo(failureCollections)
+ *      .apply(MapElements.via(...).exceptionsVia(...))
+ *      .failuresTo(failureCollections);
+ * PCollection<Map<String, String>> failures = PCollectionList.of(failureCollections)
+ *      .apply("FlattenFailureCollections", Flatten.pCollections());
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
+public class WithFailures {
+
+  /**
+   * The value type passed as input to exception handlers. It wraps an exception together with the
+   * input element that was being processed at the time the exception was raised.
+   *
+   * <p>Exception handlers may want to re-raise the exception and catch only specific subclasses in
+   * order to limit the scope of handled exceptions or access subclass-specific data.
+   */
+  @AutoValue
+  public abstract static class ExceptionElement<T> {
+    public abstract T element();
+
+    public abstract Exception exception();
+
+    public static <T> ExceptionElement<T> of(T element, Exception exception) {
+      return new AutoValue_WithFailures_ExceptionElement<>(element, exception);
+    }
+  }
+
+  /**
+   * A simple handler that extracts information from an exception to a {@code Map<String, String>}
+   * and returns a {@link KV} where the key is the input element that failed processing, and the
+   * value is the map of exception attributes.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. Map and {@link KV}
+   * coders are well supported by Beam, so coder inference can be successfully applied if the
+   * consuming transform passes type information to the failure collection's {@link TupleTag}.
+   *
+   * <p>The keys populated in the map are "className", "message", and "stackTrace" of the exception.
+   */
+  public static class ExceptionAsMapHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, Map<String, String>>> {
+    @Override
+    public KV<T, Map<String, String>> apply(ExceptionElement<T> f) {
+      return KV.of(
+          f.element(),
+          ImmutableMap.of(
+              "className", f.exception().getClass().getName(),
+              "message", f.exception().getMessage(),
+              "stackTrace", Arrays.toString(f.exception().getStackTrace())));
+    }
+  }
+
+  /**
+   * An intermediate output type for PTransforms that allows an output collection to live alongside
+   * a collection of elements that failed the transform.
+   *
+   * @param <OutputT> Output type
+   * @param <FailureElementT> Element type for the failure {@code PCollection}
+   */
+  @AutoValue
+  public abstract static class Result<OutputT extends POutput, FailureElementT>
+      implements PInput, POutput {
+
+    public abstract OutputT output();
+
+    @Nullable
+    abstract TupleTag<?> outputTag();
+
+    public abstract PCollection<FailureElementT> failures();
+
+    abstract TupleTag<FailureElementT> failuresTag();
+
+    public static <OutputT extends POutput, FailureElementT> Result<OutputT, FailureElementT> of(
+        OutputT output, PCollection<FailureElementT> failures) {
+      return new AutoValue_WithFailures_Result<>(
+          output, null, failures, new TupleTag<FailureElementT>());
+    }
+
+    public static <OutputElementT, FailureElementT>
+        Result<PCollection<OutputElementT>, FailureElementT> of(
+            PCollection<OutputElementT> output, PCollection<FailureElementT> failures) {
+      return new AutoValue_WithFailures_Result<>(
+          output, new TupleTag<OutputElementT>(), failures, new TupleTag<FailureElementT>());
+    }
+
+    public static <OutputElementT, FailureElementT>
+        Result<PCollection<OutputElementT>, FailureElementT> of(
+            PCollectionTuple tuple,
+            TupleTag<OutputElementT> outputTag,
+            TupleTag<FailureElementT> failureTag) {
+      return new AutoValue_WithFailures_Result<>(
+          tuple.get(outputTag), outputTag, tuple.get(failureTag), failureTag);
+    }
+
+    /** Adds the failure collection to the passed list and returns just the output collection. */
+    public OutputT failuresTo(List<PCollection<FailureElementT>> failureCollections) {
+      failureCollections.add(failures());
+      return output();
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return output().getPipeline();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      Map<TupleTag<?>, PValue> values = new HashMap<>();
+      values.put(failuresTag(), failures());
+      if (outputTag() != null && output() instanceof PValue) {
+        values.put(outputTag(), (PValue) output());
+      }
+      return values;
+    }
+
+    @Override
+    public void finishSpecifyingOutput(
+        String transformName, PInput input, PTransform<?, ?> transform) {}
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 8fd8f85..bd7a7ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -27,11 +27,16 @@ import static org.junit.Assert.assertThat;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.FlatMapElements.FlatMapWithFailures;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionAsMapHandler;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
+import org.apache.beam.sdk.transforms.WithFailures.Result;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -327,4 +332,104 @@ public class FlatMapElementsTest implements Serializable {
       return ImmutableList.of(input, -input);
     }
   }
+
+  /** Test of {@link FlatMapWithFailures} with a pre-built exception handler. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptionAsMap() throws Exception {
+
+    Result<PCollection<Integer>, KV<Integer, Map<String, String>>> result =
+        pipeline
+            .apply(Create.of(0, 2, 3))
+            .apply(
+                FlatMapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> ImmutableList.of(i + 1 / i, -i - 1 / i))
+                    .exceptionsVia(new ExceptionAsMapHandler<Integer>() {}));
+
+    PAssert.that(result.output()).containsInAnyOrder(2, -2, 3, -3);
+    pipeline.run();
+  }
+
+  /** Test of {@link FlatMapWithFailures} with handling defined via lambda expression. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFlatMapWithFailuresLambda() {
+    Result<PCollection<Integer>, KV<Integer, String>> result =
+        pipeline
+            .apply(Create.of(0, 2, 3))
+            .apply(
+                FlatMapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> ImmutableList.of(i + 1 / i, -i - 1 / i))
+                    .exceptionsInto(
+                        TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
+                    .exceptionsVia(f -> KV.of(f.element(), f.exception().getMessage())));
+
+    PAssert.that(result.output()).containsInAnyOrder(2, -2, 3, -3);
+    PAssert.that(result.failures()).containsInAnyOrder(KV.of(0, "/ by zero"));
+    pipeline.run();
+  }
+
+  /**
+   * Test of {@link FlatMapWithFailures()} with a {@link SimpleFunction} and no {@code into} call.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testFlatMapWithFailuresSimpleFunction() {
+    Result<PCollection<Integer>, KV<Integer, String>> result =
+        pipeline
+            .apply(Create.of(0, 2, 3))
+            .apply(
+                FlatMapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> ImmutableList.of(i + 1 / i, -i - 1 / i))
+                    .exceptionsVia(
+                        new SimpleFunction<ExceptionElement<Integer>, KV<Integer, String>>() {
+                          @Override
+                          public KV<Integer, String> apply(ExceptionElement<Integer> failure) {
+                            return KV.of(failure.element(), failure.exception().getMessage());
+                          }
+                        }));
+
+    PAssert.that(result.output()).containsInAnyOrder(2, -2, 3, -3);
+    PAssert.that(result.failures()).containsInAnyOrder(KV.of(0, "/ by zero"));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testFlatMapWithFailuresDisplayData() {
+    InferableFunction<Integer, List<Integer>> inferableFn =
+        new InferableFunction<Integer, List<Integer>>() {
+          @Override
+          public List<Integer> apply(Integer input) {
+            return ImmutableList.of(input);
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "baz"));
+          }
+        };
+
+    InferableFunction<ExceptionElement<Integer>, String> exceptionHandler =
+        new InferableFunction<ExceptionElement<Integer>, String>() {
+          @Override
+          public String apply(ExceptionElement<Integer> input) throws Exception {
+            return "";
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("bar", "buz"));
+          }
+        };
+
+    FlatMapWithFailures<?, ?, ?> mapWithFailures =
+        FlatMapElements.via(inferableFn).exceptionsVia(exceptionHandler);
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("class", inferableFn.getClass()));
+    assertThat(
+        DisplayData.from(mapWithFailures),
+        hasDisplayItem("exceptionHandler.class", exceptionHandler.getClass()));
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("foo", "baz"));
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("bar", "buz"));
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 952885a..727c1fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -23,15 +23,22 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.apache.beam.sdk.values.TypeDescriptors.integers;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Contextful.Fn;
+import org.apache.beam.sdk.transforms.MapElements.MapWithFailures;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
+import org.apache.beam.sdk.transforms.WithFailures.Result;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
@@ -39,6 +46,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -515,4 +523,118 @@ public class MapElementsTest implements Serializable {
       return val * 2;
     }
   }
+
+  /** Test of {@link MapWithFailures()} using a pre-built exception handler. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptionAsMap() {
+    Result<PCollection<Integer>, KV<Integer, Map<String, String>>> result =
+        pipeline
+            .apply(Create.of(0, 1))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> 1 / i)
+                    .exceptionsVia(new WithFailures.ExceptionAsMapHandler<Integer>() {}));
+
+    PAssert.that(result.output()).containsInAnyOrder(1);
+
+    Map<String, String> expectedFailureInfo =
+        ImmutableMap.of("className", "java.lang.ArithmeticException");
+    PAssert.thatSingleton(result.failures())
+        .satisfies(
+            kv -> {
+              assertEquals(Integer.valueOf(0), kv.getKey());
+              assertThat(kv.getValue().entrySet(), hasSize(3));
+              assertThat(kv.getValue(), hasKey("stackTrace"));
+              assertEquals("java.lang.ArithmeticException", kv.getValue().get("className"));
+              assertEquals("/ by zero", kv.getValue().get("message"));
+              return null;
+            });
+
+    pipeline.run();
+  }
+
+  /** Test of {@link MapWithFailures()} with handling defined via lambda expression. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapWithFailuresLambda() {
+    Result<PCollection<Integer>, KV<Integer, String>> result =
+        pipeline
+            .apply(Create.of(0, 1))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> 1 / i)
+                    .exceptionsInto(
+                        TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
+                    .exceptionsVia(f -> KV.of(f.element(), f.exception().getMessage())));
+
+    PAssert.that(result.output()).containsInAnyOrder(1);
+
+    PAssert.that(result.failures()).containsInAnyOrder(KV.of(0, "/ by zero"));
+
+    pipeline.run();
+  }
+
+  /** Test of {@link MapWithFailures()} with a {@link SimpleFunction} and no {@code into} call. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMapWithFailuresSimpleFunction() {
+    Result<PCollection<Integer>, KV<Integer, String>> result =
+        pipeline
+            .apply(Create.of(0, 1))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> 1 / i)
+                    .exceptionsVia(
+                        new SimpleFunction<ExceptionElement<Integer>, KV<Integer, String>>() {
+                          @Override
+                          public KV<Integer, String> apply(ExceptionElement<Integer> failure) {
+                            return KV.of(failure.element(), failure.exception().getMessage());
+                          }
+                        }));
+
+    PAssert.that(result.output()).containsInAnyOrder(1);
+
+    PAssert.that(result.failures()).containsInAnyOrder(KV.of(0, "/ by zero"));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testMapWithFailuresDisplayData() {
+    InferableFunction<Integer, Integer> inferableFn =
+        new InferableFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) {
+            return input;
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("foo", "baz"));
+          }
+        };
+
+    InferableFunction<ExceptionElement<Integer>, String> exceptionHandler =
+        new InferableFunction<ExceptionElement<Integer>, String>() {
+          @Override
+          public String apply(ExceptionElement<Integer> input) throws Exception {
+            return "";
+          }
+
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.add(DisplayData.item("bar", "buz"));
+          }
+        };
+
+    MapWithFailures<?, ?, ?> mapWithFailures =
+        MapElements.via(inferableFn).exceptionsVia(exceptionHandler);
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("class", inferableFn.getClass()));
+    assertThat(
+        DisplayData.from(mapWithFailures),
+        hasDisplayItem("exceptionHandler.class", exceptionHandler.getClass()));
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("foo", "baz"));
+    assertThat(DisplayData.from(mapWithFailures), hasDisplayItem("bar", "buz"));
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithFailuresTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithFailuresTest.java
new file mode 100644
index 0000000..5841674
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithFailuresTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.WithFailures.ExceptionAsMapHandler;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link WithFailures}. */
+@RunWith(JUnit4.class)
+public class WithFailuresTest implements Serializable {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  /** Test of {@link WithFailures.Result#failuresTo(List)}. */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testExceptionAsMap() {
+    List<PCollection<KV<Integer, Map<String, String>>>> errorCollections = new ArrayList<>();
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(0, 1))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via((Integer i) -> 1 / i)
+                    .exceptionsVia(new ExceptionAsMapHandler<Integer>() {}))
+            .failuresTo(errorCollections);
+
+    PAssert.that(output).containsInAnyOrder(1);
+
+    Map<String, String> expectedFailureInfo =
+        ImmutableMap.of("className", "java.lang.ArithmeticException");
+    PAssert.thatSingleton(PCollectionList.of(errorCollections).apply(Flatten.pCollections()))
+        .satisfies(
+            kv -> {
+              assertEquals(Integer.valueOf(0), kv.getKey());
+              assertThat(kv.getValue().entrySet(), hasSize(3));
+              assertThat(kv.getValue(), hasKey("stackTrace"));
+              assertEquals("java.lang.ArithmeticException", kv.getValue().get("className"));
+              assertEquals("/ by zero", kv.getValue().get("message"));
+              return null;
+            });
+
+    pipeline.run();
+  }
+}