You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2018/03/21 14:00:23 UTC

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5737

    [FLINK-8721][flip6] Handle archiving failures for accumulators

    During archivization, wrap errors thrown by users' Accumulators into a FailedAccumulator and do not fail the job because of that.
    
    ## Verifying this change
    
    This change is covered by existing AccumulatorErrorITCase
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f8721

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5737
    
----
commit c61014ae458767926c4f5b13a9df7bc70135d13c
Author: Piotr Nowojski <pi...@...>
Date:   2018-03-21T09:54:49Z

    [hotfix][runtime] Remove unused method

commit 655644322f7c61ddd89bcc3e5aab636047b97d55
Author: Piotr Nowojski <pi...@...>
Date:   2018-03-21T12:08:36Z

    [FLINK-8721][flip6] Handle archiving failures for accumulators

----


---

[GitHub] flink issue #5737: [FLINK-8721][flip6] Handle archiving failures for accumul...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/5737
  
    Thanks!


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176517187
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java ---
    @@ -48,8 +49,9 @@
     	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
     	 * @param accumulators A map of all accumulator results produced by the job, in serialized form
     	 */
    -	public SerializedJobExecutionResult(JobID jobID, long netRuntime,
    -										Map<String, SerializedValue<Object>> accumulators) {
    +	public SerializedJobExecutionResult(JobID jobID,
    +										long netRuntime,
    +										Map<String, SerializedValue<OptionalFailure<Object>>> accumulators) {
    --- End diff --
    
    Something is with the indentation off here.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176515734
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    +			LOG.error("Failed to archive accumulators", ex);
    +			return OptionalFailure.ofFailure(ex);
    +		}
    +	}
    +
    +	/**
    +	 * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
    +	 */
    +	public T get() throws FlinkRuntimeException {
    +		if (value != null) {
    +			return value;
    +		}
    +		checkNotNull(failureCause);
    +		throw new FlinkRuntimeException(failureCause);
    +	}
    +
    +	public Throwable getFailureCause() {
    +		return checkNotNull(failureCause);
    +	}
    +
    +	public boolean isFailure() {
    +		return failureCause != null;
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(value, failureCause);
    +	}
    +
    +	@Override
    +	public boolean equals(Object object) {
    +		if (object == null) {
    +			return false;
    +		}
    +		if (object == this) {
    +			return true;
    +		}
    +		if (!(object instanceof OptionalFailure)) {
    +			return false;
    +		}
    +		OptionalFailure other = (OptionalFailure) object;
    --- End diff --
    
    Let's cast to `OptionalFailure<?>`


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176514312
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    --- End diff --
    
    This type is not serializable. I think you should mark it `transient` and then override `readObject` and `writeObject` similar to how `ArrayList` does it.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176515090
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    --- End diff --
    
    Not sure whether we should capture the `RuntimeException` here. To me a `supplier` should not throw `RuntimeExceptions` and if so, then it should not produce a `OptionalFailure` but instead fail with a `RuntimeException`.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5737


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176517944
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java ---
    @@ -21,83 +21,98 @@
     import org.apache.flink.api.common.JobExecutionResult;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.core.testutils.CommonTestUtils;
    +import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkRuntimeException;
    +import org.apache.flink.util.OptionalFailure;
     import org.apache.flink.util.SerializedValue;
    +
     import org.junit.Test;
     
     import java.util.HashMap;
     import java.util.Map;
     import java.util.concurrent.TimeUnit;
     
    -import static org.junit.Assert.*;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
     
     /**
      * Tests for the SerializedJobExecutionResult
      */
     public class SerializedJobExecutionResultTest {
    --- End diff --
    
    `extends TestLogger` missing


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176207009
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java ---
    @@ -67,22 +72,29 @@ public String getValue() {
     
     			int i = 0;
     			for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
    -				StringifiedAccumulatorResult result;
    -				Accumulator<?, ?> accumulator = entry.getValue();
    -				if (accumulator != null) {
    -					Object localValue = accumulator.getLocalValue();
    -					if (localValue != null) {
    -						result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString());
    -					} else {
    -						result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null");
    -					}
    -				} else {
    -					result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
    -				}
    -
    -				results[i++] = result;
    +				results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
     			}
     			return results;
     		}
     	}
    +
    +	private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) {
    --- End diff --
    
    `@Nullable` missing for `accumulator`


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176454726
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
    @@ -45,84 +47,87 @@
      *  b) are not compatible with existing accumulator.
      */
     public class AccumulatorErrorITCase extends TestLogger {
    -
    -	private static LocalFlinkMiniCluster cluster;
    -
    -	private static ExecutionEnvironment env;
    -
    -	@BeforeClass
    -	public static void startCluster() {
    +	private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
    +	private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
    +	private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
    +
    +	@ClassRule
    +	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			2,
    +			3));
    +
    +	public static Configuration getConfiguration() {
     		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
     		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
    -		cluster = new LocalFlinkMiniCluster(config, false);
    -
    -		cluster.start();
    -
    -		env = new TestEnvironment(cluster, 6, false);
    -	}
    -
    -	@AfterClass
    -	public static void shutdownCluster() {
    -		cluster.stop();
    -		cluster = null;
    +		return config;
     	}
     
     	@Test
     	public void testFaultyAccumulator() throws Exception {
    -
    +		TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
    --- End diff --
    
    Is there an equivalent to `.output(new DiscardingOutputFormat<>());` in the  `StreamExecutionEnvironment`?


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176208188
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
    @@ -45,84 +47,87 @@
      *  b) are not compatible with existing accumulator.
      */
     public class AccumulatorErrorITCase extends TestLogger {
    -
    -	private static LocalFlinkMiniCluster cluster;
    -
    -	private static ExecutionEnvironment env;
    -
    -	@BeforeClass
    -	public static void startCluster() {
    +	private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
    +	private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
    +	private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
    +
    +	@ClassRule
    +	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			2,
    +			3));
    +
    +	public static Configuration getConfiguration() {
     		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
     		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
    -		cluster = new LocalFlinkMiniCluster(config, false);
    -
    -		cluster.start();
    -
    -		env = new TestEnvironment(cluster, 6, false);
    -	}
    -
    -	@AfterClass
    -	public static void shutdownCluster() {
    -		cluster.stop();
    -		cluster = null;
    +		return config;
     	}
     
     	@Test
     	public void testFaultyAccumulator() throws Exception {
    -
    +		TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
    --- End diff --
    
    You could also write `StreamExecutionEnvironment.getExecutionEnvironment()`.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176207225
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java ---
    @@ -67,22 +72,29 @@ public String getValue() {
     
     			int i = 0;
     			for (Map.Entry<String, Accumulator<?, ?>> entry : accs.entrySet()) {
    -				StringifiedAccumulatorResult result;
    -				Accumulator<?, ?> accumulator = entry.getValue();
    -				if (accumulator != null) {
    -					Object localValue = accumulator.getLocalValue();
    -					if (localValue != null) {
    -						result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString());
    -					} else {
    -						result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null");
    -					}
    -				} else {
    -					result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null");
    -				}
    -
    -				results[i++] = result;
    +				results[i++] = stringifyAccumulatorResult(entry.getKey(), entry.getValue());
     			}
     			return results;
     		}
     	}
    +
    +	private static StringifiedAccumulatorResult stringifyAccumulatorResult(String name, Accumulator<?, ?> accumulator) {
    +		if (accumulator == null) {
    +			return new StringifiedAccumulatorResult(name, "null", "null");
    +		} else {
    +			Object localValue;
    +			try {
    +				localValue = accumulator.getLocalValue();
    +			}
    +			catch (RuntimeException exception) {
    +				LOG.error("Failed to stringify accumulator", exception);
    --- End diff --
    
    Maybe add `name` to log statement.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176676947
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    +			LOG.error("Failed to archive accumulators", ex);
    --- End diff --
    
    Ops, it was pulled in here by an accident.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176515453
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    +			LOG.error("Failed to archive accumulators", ex);
    +			return OptionalFailure.ofFailure(ex);
    +		}
    +	}
    +
    +	/**
    +	 * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
    +	 */
    +	public T get() throws FlinkRuntimeException {
    --- End diff --
    
    I think `get` should throw a checked exception and not an unchecked exception. Otherwise users won't be aware of it. We could provide a method `getUnchecked` where we throw an unchecked exception.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176517725
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java ---
    @@ -50,13 +51,13 @@
     
     	@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
     	@JsonSerialize(contentUsing = SerializedValueSerializer.class)
    -	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
    +	private Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators;
     
     	@JsonCreator
     	public JobAccumulatorsInfo(
    -			@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
    -			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
    -			@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<Object>> serializedUserAccumulators) {
    +		@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
    +		@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
    +		@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators) {
    --- End diff --
    
    indentation is wrong


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176212439
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -763,48 +764,32 @@ public Executor getFutureExecutor() {
     		return userAccumulators;
     	}
     
    -	/**
    -	 * Gets the accumulator results.
    -	 */
    -	public Map<String, Object> getAccumulators() {
    -
    -		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
    -
    -		Map<String, Object> result = new HashMap<>();
    -		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
    -			result.put(entry.getKey(), entry.getValue().getLocalValue());
    -		}
    -
    -		return result;
    -	}
    -
     	/**
     	 * Gets a serialized accumulator map.
     	 * @return The accumulator map with serialized accumulator values.
     	 */
     	@Override
     	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
    +		return aggregateUserAccumulators()
    +			.entrySet()
    +			.stream()
    +			.collect(Collectors.toMap(Map.Entry::getKey, entry -> serializeAccumulator(entry.getKey(), entry.getValue())));
    +	}
     
    -		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
    -
    -		Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
    -		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
    -
    +	private static SerializedValue<Object> serializeAccumulator(String name, Accumulator<?, ?> accumulator) {
    +		try {
    +			if (accumulator instanceof FailedAccumulator) {
    +				return new SerializedValue<>(accumulator);
    +			}
    +			return new SerializedValue<>(accumulator.getLocalValue());
    +		} catch (IOException ioe) {
    +			LOG.error("Could not serialize accumulator " + name + '.', ioe);
     			try {
    -				final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue());
    -				result.put(entry.getKey(), serializedValue);
    -			} catch (IOException ioe) {
    -				LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe);
    -
    -				try {
    -					result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
    -				} catch (IOException e) {
    -					throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
    -				}
    +				return new SerializedValue<>(new FailedAccumulator(ioe));
    --- End diff --
    
    Hmm, the problem I see here is that in the success case, we store the accumulator value and in the failure case we store an `Accumulator` instance. Thus, the user will expect the accumulator value and casting it accordingly. Thus he will never call the `Accumulator` methods which will throw the exceptions (see `JobExecutionResult` for how the user interacts with the accumulator values). In that sense the previous solution with storing a `FailedAccumulatorSerialization` was also flawed.
    
    What we actually would have to store in the `SerializedValue` is something like an `Either<Throwable, V>`. On the client side when accessing the `accumulatorsValueMap` it should check whether it is left or right and in the left case throw the exception.
    
    Alternatively, we say that an accumulator failure always results in a job failure. This means that in `JobMaster#jobStatusChanged` we generate a failed `ArchivedExecutionGraph` in case of an accumulator failure.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176518316
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java ---
    @@ -45,84 +47,87 @@
      *  b) are not compatible with existing accumulator.
      */
     public class AccumulatorErrorITCase extends TestLogger {
    -
    -	private static LocalFlinkMiniCluster cluster;
    -
    -	private static ExecutionEnvironment env;
    -
    -	@BeforeClass
    -	public static void startCluster() {
    +	private static final String FAULTY_CLONE_ACCUMULATOR = "faulty-clone";
    +	private static final String FAULTY_MERGE_ACCUMULATOR = "faulty-merge";
    +	private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators";
    +
    +	@ClassRule
    +	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			2,
    +			3));
    +
    +	public static Configuration getConfiguration() {
     		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
     		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
    -		cluster = new LocalFlinkMiniCluster(config, false);
    -
    -		cluster.start();
    -
    -		env = new TestEnvironment(cluster, 6, false);
    -	}
    -
    -	@AfterClass
    -	public static void shutdownCluster() {
    -		cluster.stop();
    -		cluster = null;
    +		return config;
     	}
     
     	@Test
     	public void testFaultyAccumulator() throws Exception {
    -
    +		TestEnvironment env = MINI_CLUSTER_RESOURCE.getTestEnvironment();
    --- End diff --
    
    a no-op sink?


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176516910
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    +			LOG.error("Failed to archive accumulators", ex);
    --- End diff --
    
    The message indicates that `OptionalFailure` was implemented for the accumulators in mind, but I think it should be more generic. I guess that `AccumulatorHelper#67` is also the reason why we catch the `RuntimeException` to make the merge supplier as smooth as possible.


---

[GitHub] flink pull request #5737: [FLINK-8721][flip6] Handle archiving failures for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5737#discussion_r176515638
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/OptionalFailure.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.Objects;
    +import java.util.function.Supplier;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Wrapper around an object representing either a success (with a given value) or a failure cause.
    + */
    +public class OptionalFailure<T> implements Serializable {
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(OptionalFailure.class);
    +
    +	@Nullable
    +	private T value;
    +
    +	@Nullable
    +	private Throwable failureCause;
    +
    +	private OptionalFailure(@Nullable T value, @Nullable Throwable failureCause) {
    +		this.value = value;
    +		this.failureCause = failureCause;
    +	}
    +
    +	public static <T> OptionalFailure<T> of(T value) {
    +		return new OptionalFailure<>(value, null);
    +	}
    +
    +	public static <T> OptionalFailure<T> ofFailure(Throwable failureCause) {
    +		return new OptionalFailure<>(null, failureCause);
    +	}
    +
    +	/**
    +	 * @return wrapped {@link OptionalFailure} returned by {@code valueSupplier} or wrapped failure if
    +	 * {@code valueSupplier} has thrown a {@link RuntimeException}.
    +	 */
    +	public static <T> OptionalFailure<T> createFrom(Supplier<T> valueSupplier) {
    +		try {
    +			return OptionalFailure.of(valueSupplier.get());
    +		}
    +		catch (RuntimeException ex) {
    +			LOG.error("Failed to archive accumulators", ex);
    +			return OptionalFailure.ofFailure(ex);
    +		}
    +	}
    +
    +	/**
    +	 * @return stored value or throw a {@link FlinkRuntimeException} with {@code failureCause}.
    +	 */
    +	public T get() throws FlinkRuntimeException {
    +		if (value != null) {
    +			return value;
    +		}
    +		checkNotNull(failureCause);
    +		throw new FlinkRuntimeException(failureCause);
    +	}
    +
    +	public Throwable getFailureCause() {
    +		return checkNotNull(failureCause);
    +	}
    +
    +	public boolean isFailure() {
    +		return failureCause != null;
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(value, failureCause);
    +	}
    +
    +	@Override
    +	public boolean equals(Object object) {
    --- End diff --
    
    why deviating from the super class' parameter name `obj`?


---