You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2018/03/21 14:00:23 UTC
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
GitHub user pnowojski opened a pull request:
https://github.com/apache/flink/pull/5737
[FLINK-8721][flip6] Handle archiving failures for accumulators
During archivization, wrap errors thrown by users' Accumulators into a FailedAccumulator and do not fail the job because of that.
## Verifying this change
This change is covered by existing AccumulatorErrorITCase
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/pnowojski/flink f8721
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5737.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5737
----
commit c61014ae458767926c4f5b13a9df7bc70135d13c
Author: Piotr Nowojski <pi...@...>
Date: 2018-03-21T09:54:49Z
[hotfix][runtime] Remove unused method
commit 655644322f7c61ddd89bcc3e5aab636047b97d55
Author: Piotr Nowojski <pi...@...>
Date: 2018-03-21T12:08:36Z
[FLINK-8721][flip6] Handle archiving failures for accumulators
----
---
[GitHub] flink issue #5737: [FLINK-8721][flip6] Handle archiving failures for accumul...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:
https://github.com/apache/flink/pull/5737
Thanks!
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176517187
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java ---
@@ -48,8 +49,9 @@
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulator results produced by the job, in serialized form
*/
- public SerializedJobExecutionResult(JobID jobID, long netRuntime,
- Map<String, SerializedValue<Object>> accumulators) {
+ public SerializedJobExecutionResult(JobID jobID,
+ long netRuntime,
+ Map<String, SerializedValue<OptionalFailure<Object>>> accumulators) {
--- End diff --
Something is with the indentation off here.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176515734
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
+ LOG.error("Failed to archive accumulators", ex);
+ return OptionalFailure.ofFailure(ex);
+ }
+ }
+
+ /**
+ * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
+ */
+ public T get() throws FlinkRuntimeException {
+ if (value != null) {
+ return value;
+ }
+ checkNotNull(failureCause);
+ throw new FlinkRuntimeException(failureCause);
+ }
+
+ public Throwable getFailureCause() {
+ return checkNotNull(failureCause);
+ }
+
+ public boolean isFailure() {
+ return failureCause != null;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, failureCause);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object == null) {
+ return false;
+ }
+ if (object == this) {
+ return true;
+ }
+ if (!(object instanceof OptionalFailure)) {
+ return false;
+ }
+ OptionalFailure other = (OptionalFailure) object;
--- End diff --
Let's cast to `OptionalFailure<?>`
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176514312
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
--- End diff --
This type is not serializable. I think you should mark it `transient` and then override `readObject` and `writeObject` similar to how `ArrayList` does it.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176515090
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
--- End diff --
Not sure whether we should capture the `RuntimeException` here. To me a `supplier` should not throw `RuntimeExceptions` and if so, then it should not produce a `OptionalFailure` but instead fail with a `RuntimeException`.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5737
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176517944
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java ---
@@ -21,83 +21,98 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
+
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the SerializedJobExecutionResult
*/
public class SerializedJobExecutionResultTest {
--- End diff --
`extends TestLogger` missing
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176207009
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java ---
@@ -67,22 +72,29 @@ public String getValue() {
int i = 0;
for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
- StringifiedAccumulatorResult result;
- Accumulator<?, ?> accumulator = entry.getValue();
- if (accumulator != null) {
- Object localValue = accumulator.getLocalValue();
- if (localValue != null) {
- result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString());
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null");
- }
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
- }
-
- results[i++] = result;
+ results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}
+
+ private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) {
--- End diff --
`@Nullable` missing for `accumulator`
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176454726
--- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
@@ -45,84 +47,87 @@
* b) are not compatible with existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {
-
- private static LocalFlinkMiniCluster cluster;
-
- private static ExecutionEnvironment env;
-
- @BeforeClass
- public static void startCluster() {
+ private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+ private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+ private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
+ 3));
+
+ public static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- cluster = new LocalFlinkMiniCluster(config, false);
-
- cluster.start();
-
- env = new TestEnvironment(cluster, 6, false);
- }
-
- @AfterClass
- public static void shutdownCluster() {
- cluster.stop();
- cluster = null;
+ return config;
}
@Test
public void testFaultyAccumulator() throws Exception {
-
+ TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --
Is there an equivalent to `.output(new DiscardingOutputFormat<>());` in the `StreamExecutionEnvironment`?
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176208188
--- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
@@ -45,84 +47,87 @@
* b) are not compatible with existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {
-
- private static LocalFlinkMiniCluster cluster;
-
- private static ExecutionEnvironment env;
-
- @BeforeClass
- public static void startCluster() {
+ private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+ private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+ private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
+ 3));
+
+ public static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- cluster = new LocalFlinkMiniCluster(config, false);
-
- cluster.start();
-
- env = new TestEnvironment(cluster, 6, false);
- }
-
- @AfterClass
- public static void shutdownCluster() {
- cluster.stop();
- cluster = null;
+ return config;
}
@Test
public void testFaultyAccumulator() throws Exception {
-
+ TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --
You could also write `StreamExecutionEnvironment.getExecutionEnvironment()`.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176207225
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java ---
@@ -67,22 +72,29 @@ public String getValue() {
int i = 0;
for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
- StringifiedAccumulatorResult result;
- Accumulator<?, ?> accumulator = entry.getValue();
- if (accumulator != null) {
- Object localValue = accumulator.getLocalValue();
- if (localValue != null) {
- result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString());
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null");
- }
- } else {
- result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
- }
-
- results[i++] = result;
+ results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
}
return results;
}
}
+
+ private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) {
+ if (accumulator == null) {
+ return new StringifiedAccumulatorResult(name, "null", "null");
+ } else {
+ Object localValue;
+ try {
+ localValue = accumulator.getLocalValue();
+ }
+ catch (RuntimeException exception) {
+ LOG.error("Failed to stringify accumulator", exception);
--- End diff --
Maybe add `name` to log statement.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176676947
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
+ LOG.error("Failed to archive accumulators", ex);
--- End diff --
Ops, it was pulled in here by an accident.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176515453
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
+ LOG.error("Failed to archive accumulators", ex);
+ return OptionalFailure.ofFailure(ex);
+ }
+ }
+
+ /**
+ * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
+ */
+ public T get() throws FlinkRuntimeException {
--- End diff --
I think `get` should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method `getUnchecked` where we throw an unchecked exception.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176517725
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java ---
@@ -50,13 +51,13 @@
@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
@JsonSerialize(contentUsing = SerializedValueSerializer.class)
- private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+ private Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators;
@JsonCreator
public JobAccumulatorsInfo(
- @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
- @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
- @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<Object>> serializedUserAccumulators) {
+ @JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
+ @JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
+ @JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators) {
--- End diff --
indentation is wrong
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176212439
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
@@ -763,48 +764,32 @@ public Executor getFutureExecutor() {
return userAccumulators;
}
- /**
- * Gets the accumulator results.
- */
- public Map<String, Object> getAccumulators() {
-
- Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
-
- Map<String, Object> result = new HashMap<>();
- for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
- result.put(entry.getKey(), entry.getValue().getLocalValue());
- }
-
- return result;
- }
-
/**
* Gets a serialized accumulator map.
* @return The accumulator map with serialized accumulator values.
*/
@Override
public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
+ return aggregateUserAccumulators()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> serializeAccumulator(entry.getKey(), entry.getValue())));
+ }
- Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
-
- Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
- for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
-
+ private static SerializedValue<Object> serializeAccumulator(String name, Accumulator<?, ?> accumulator) {
+ try {
+ if (accumulator instanceof FailedAccumulator) {
+ return new SerializedValue<>(accumulator);
+ }
+ return new SerializedValue<>(accumulator.getLocalValue());
+ } catch (IOException ioe) {
+ LOG.error("Could not serialize accumulator " + name + '.', ioe);
try {
- final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue());
- result.put(entry.getKey(), serializedValue);
- } catch (IOException ioe) {
- LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe);
-
- try {
- result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
- } catch (IOException e) {
- throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
- }
+ return new SerializedValue<>(new FailedAccumulator(ioe));
--- End diff --
Hmm, the problem I see here is that in the success case, we store the accumulator value and in the failure case we store an `Accumulator` instance. Thus, the user will expect the accumulator value and casting it accordingly. Thus he will never call the `Accumulator` methods which will throw the exceptions (see `JobExecutionResult` for how the user interacts with the accumulator values). In that sense the previous solution with storing a `FailedAccumulatorSerialization` was also flawed.
What we actually would have to store in the `SerializedValue` is something like an `Either<Throwable, V>`. On the client side when accessing the `accumulatorsValueMap` it should check whether it is left or right and in the left case throw the exception.
Alternatively, we say that an accumulator failure always results in a job failure. This means that in `JobMaster#jobStatusChanged` we generate a failed `ArchivedExecutionGraph` in case of an accumulator failure.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176518316
--- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
@@ -45,84 +47,87 @@
* b) are not compatible with existing accumulator.
*/
public class AccumulatorErrorITCase extends TestLogger {
-
- private static LocalFlinkMiniCluster cluster;
-
- private static ExecutionEnvironment env;
-
- @BeforeClass
- public static void startCluster() {
+ private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
+ private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
+ private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
+
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 2,
+ 3));
+
+ public static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- cluster = new LocalFlinkMiniCluster(config, false);
-
- cluster.start();
-
- env = new TestEnvironment(cluster, 6, false);
- }
-
- @AfterClass
- public static void shutdownCluster() {
- cluster.stop();
- cluster = null;
+ return config;
}
@Test
public void testFaultyAccumulator() throws Exception {
-
+ TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
--- End diff --
a no-op sink?
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176516910
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
+ LOG.error("Failed to archive accumulators", ex);
--- End diff --
The message indicates that `OptionalFailure` was implemented for the accumulators in mind, but I think it should be more generic. I guess that `AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` to make the merge supplier as smooth as possible.
---
[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5737#discussion_r176515638
--- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper around an object representing either a success (with a given value) or a failure cause.
+ */
+public class OptionalFailure<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
+
+ @Nullable
+ private T value;
+
+ @Nullable
+ private Throwable failureCause;
+
+ private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
+ this.value = value;
+ this.failureCause = failureCause;
+ }
+
+ public static <T> OptionalFailure<T> of(T value) {
+ return new OptionalFailure<>(value, null);
+ }
+
+ public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
+ return new OptionalFailure<>(null, failureCause);
+ }
+
+ /**
+ * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
+ * {@code valueSupplier} has thrown a {@link RuntimeException}.
+ */
+ public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
+ try {
+ return OptionalFailure.of(valueSupplier.get());
+ }
+ catch (RuntimeException ex) {
+ LOG.error("Failed to archive accumulators", ex);
+ return OptionalFailure.ofFailure(ex);
+ }
+ }
+
+ /**
+ * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
+ */
+ public T get() throws FlinkRuntimeException {
+ if (value != null) {
+ return value;
+ }
+ checkNotNull(failureCause);
+ throw new FlinkRuntimeException(failureCause);
+ }
+
+ public Throwable getFailureCause() {
+ return checkNotNull(failureCause);
+ }
+
+ public boolean isFailure() {
+ return failureCause != null;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, failureCause);
+ }
+
+ @Override
+ public boolean equals(Object object) {
--- End diff --
why deviating from the super class' parameter name `obj`?
---