You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/07 18:04:25 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665587222



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;

Review comment:
       Noting that this namespace is for non-user-facing classes that have no backwards compatibility guarantees.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>

Review comment:
       I'm not sure if you know this, but you can do `new SimpleFunction<Foo, Bar>(<lambda>) {}` and the curly braces at the end cause it to be a subclass with type information preserved. I think having this class is fine, too. But if it is just boilerplate to preserve types then you might be able to use the inline anonymous subclass trick.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
+  private Throwable throwable;
+
+  private ComparableThrowable() {
+    // Can't set this to null without adding a pointless @Nullable annotation to the field. It also
+    // needs to be set from the constructor to avoid a checkstyle violation.
+    this.throwable = new Throwable();
+  }
+
+  /** Wraps {@code throwable} and returns the result. */
+  public static ComparableThrowable forThrowable(Throwable throwable) {
+    ComparableThrowable comparable = new ComparableThrowable();
+    comparable.throwable = throwable;
+    return comparable;
+  }
+
+  /** Returns the underlying {@link Throwable}. */
+  public Throwable throwable() {
+    return throwable;
+  }
+
+  @Override
+  public int hashCode() {
+    return throwable.hashCode();
+  }
+
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (!(obj instanceof ComparableThrowable)) {
+      return false;
+    }
+    Throwable other = ((ComparableThrowable) obj).throwable;
+
+    boolean currentLevelEqual =
+        throwable.getClass().isInstance(other)

Review comment:
       I think you want to check if their classes are equal. This will be fine for mutation detection, etc, which is not really using equality except to check that nothing broke.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Yea I was going to comment on this. It is OK to be clunky. Perhaps `ThrowableWithEquals`. The problem of course is that inheriting concrete implementations of `equals` carries risk. Subclasses will most likely not implement equals so if they carry additional information they will have an incorrect implementation. (this is mitigated when they have an interface like `List` that specifies what equality means, and equality is defined to be in terms of the interface)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Having read the rest of the PR, I think the reason for this is to make it a friendly Beam element, so `SerializableThrowable` might be OK.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
               getIdAttribute(),
               getNeedsAttributes(),
               getNeedsMessageId());
-      PCollection<T> read =
-          input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
+
+      PCollection<T> read;
+      PCollection<PubsubMessage> preParse = input.apply(source);
+      TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+      if (getDeadLetterTopicProvider() == null) {
+        read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+      } else {
+        Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+            preParse.apply(
+                "PubsubIO.Read/Map/Parse-Incoming-Messages",
+                MapElements.into(typeDescriptor)
+                    .via(getParseFn())
+                    .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
+        read = result.output();
+
+        // Write out failures to the provided dead-letter topic.
+        result
+            .failures()

Review comment:
       OK so seeing this, I _think_ there is no update problem because the throwable is never actually serialized.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+    @Override
+    public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+      return KV.of(f.element(), ComparableThrowable.forThrowable(f.exception()));

Review comment:
       Unless I missed something, this output will be inferred to use `KvCoder.of(<T coder>. SerializableCoder)`. So this may create problems for pipeline update.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like

Review comment:
       `man child classes`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org