You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/20 18:21:41 UTC
[flink] 02/03: [hotfix][runtime] Fix checkstyle in
org.apache.flink.runtime.checkpoint (main scope)
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90dd45bfb04324ce559e58d9362ea561698b5ff3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Dec 8 15:17:23 2019 +0100
[hotfix][runtime] Fix checkstyle in org.apache.flink.runtime.checkpoint (main scope)
Test scope is not included in this fix.
---
.../runtime/checkpoint/AbstractCheckpointStats.java | 1 +
.../checkpoint/CheckpointCoordinatorGateway.java | 3 +++
.../runtime/checkpoint/CheckpointFailureManager.java | 2 +-
.../flink/runtime/checkpoint/CheckpointMetaData.java | 4 ++--
.../flink/runtime/checkpoint/CheckpointMetrics.java | 10 +++++-----
.../runtime/checkpoint/CheckpointRetentionPolicy.java | 2 +-
.../runtime/checkpoint/CheckpointStatsHistory.java | 10 +++++-----
.../runtime/checkpoint/CheckpointStatsSnapshot.java | 2 +-
.../runtime/checkpoint/CompletedCheckpointStore.java | 2 +-
.../DefaultLastStateConnectionStateListener.java | 3 +++
.../runtime/checkpoint/FailedCheckpointStats.java | 1 +
.../apache/flink/runtime/checkpoint/OperatorState.java | 16 ++++++++--------
.../flink/runtime/checkpoint/OperatorSubtaskState.java | 2 +-
.../runtime/checkpoint/PendingCheckpointStats.java | 3 ++-
.../checkpoint/PrioritizedOperatorSubtaskState.java | 7 +++++--
.../runtime/checkpoint/StateAssignmentOperation.java | 8 ++++----
.../runtime/checkpoint/StateObjectCollection.java | 1 +
.../apache/flink/runtime/checkpoint/SubtaskState.java | 4 ++--
.../org/apache/flink/runtime/checkpoint/TaskState.java | 11 +++++------
.../flink/runtime/checkpoint/TaskStateStats.java | 1 +
.../checkpoint/ZooKeeperCheckpointRecoveryFactory.java | 3 ++-
.../checkpoint/ZooKeeperCompletedCheckpointStore.java | 5 ++---
.../flink/runtime/checkpoint/hooks/MasterHooks.java | 13 +++----------
.../checkpoint/savepoint/SavepointSerializers.java | 6 +++---
.../runtime/checkpoint/savepoint/SavepointV1.java | 4 ++--
.../checkpoint/savepoint/SavepointV1Serializer.java | 7 ++++---
.../runtime/checkpoint/savepoint/SavepointV2.java | 18 +++++++++---------
.../checkpoint/savepoint/SavepointV2Serializer.java | 16 ++++++++--------
.../runtime/state/memory/ByteStreamStateHandle.java | 2 +-
.../savepoint/SavepointV2SerializerTest.java | 10 +++++-----
tools/maven/suppressions-runtime.xml | 6 +-----
31 files changed, 93 insertions(+), 90 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
index 5b3c7c7..f49790d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index b8dc554..a199820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
+/**
+ * RPC Gateway interface for messages to the CheckpointCoordinator.
+ */
public interface CheckpointCoordinatorGateway extends RpcGateway {
void acknowledgeCheckpoint(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 3e73e7f..9cd8fba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -142,7 +142,7 @@ public class CheckpointFailureManager {
* @param checkpointId the failed checkpoint id used to count the continuous failure number based on
* checkpoint id sequence.
*/
- public void handleCheckpointSuccess(long checkpointId) {
+ public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
clearCount();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
index 9960b44..aa31885 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java
@@ -27,10 +27,10 @@ public class CheckpointMetaData implements Serializable {
private static final long serialVersionUID = -2387652345781312442L;
- /** The ID of the checkpoint */
+ /** The ID of the checkpoint. */
private final long checkpointId;
- /** The timestamp of the checkpoint */
+ /** The timestamp of the checkpoint. */
private final long timestamp;
public CheckpointMetaData(long checkpointId, long timestamp) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index 4307a73..d18feb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -30,16 +30,16 @@ public class CheckpointMetrics implements Serializable {
private static final long serialVersionUID = 1L;
- /** The number of bytes that were buffered during the checkpoint alignment phase */
+ /** The number of bytes that were buffered during the checkpoint alignment phase. */
private long bytesBufferedInAlignment;
- /** The duration (in nanoseconds) that the stream alignment for the checkpoint took */
+ /** The duration (in nanoseconds) that the stream alignment for the checkpoint took. */
private long alignmentDurationNanos;
- /* The duration (in milliseconds) of the synchronous part of the operator checkpoint */
+ /** The duration (in milliseconds) of the synchronous part of the operator checkpoint. */
private long syncDurationMillis;
- /* The duration (in milliseconds) of the asynchronous part of the operator checkpoint */
+ /** The duration (in milliseconds) of the asynchronous part of the operator checkpoint. */
private long asyncDurationMillis;
private long checkpointStartDelayNanos;
@@ -121,7 +121,7 @@ public class CheckpointMetrics implements Serializable {
CheckpointMetrics that = (CheckpointMetrics) o;
- return bytesBufferedInAlignment == that.bytesBufferedInAlignment &&
+ return bytesBufferedInAlignment == that.bytesBufferedInAlignment &&
alignmentDurationNanos == that.alignmentDurationNanos &&
syncDurationMillis == that.syncDurationMillis &&
asyncDurationMillis == that.asyncDurationMillis &&
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
index 3bd124d..01e5f93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
@@ -33,5 +33,5 @@ public enum CheckpointRetentionPolicy {
RETAIN_ON_FAILURE,
/** Checkpoints should always be cleaned up when an application reaches a terminal state. */
- NEVER_RETAIN_AFTER_TERMINATION;
+ NEVER_RETAIN_AFTER_TERMINATION
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 9db302c..2892c87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -92,8 +92,8 @@ public class CheckpointStatsHistory implements Serializable {
false,
maxSize,
new AbstractCheckpointStats[0],
- Collections.<AbstractCheckpointStats>emptyList(),
- Collections.<Long, AbstractCheckpointStats>emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
null,
null,
null);
@@ -116,9 +116,9 @@ public class CheckpointStatsHistory implements Serializable {
AbstractCheckpointStats[] checkpointArray,
List<AbstractCheckpointStats> checkpointsHistory,
Map<Long, AbstractCheckpointStats> checkpointsById,
- CompletedCheckpointStats latestCompletedCheckpoint,
- FailedCheckpointStats latestFailedCheckpoint,
- CompletedCheckpointStats latestSavepoint) {
+ @Nullable CompletedCheckpointStats latestCompletedCheckpoint,
+ @Nullable FailedCheckpointStats latestFailedCheckpoint,
+ @Nullable CompletedCheckpointStats latestSavepoint) {
this.readOnly = readOnly;
checkArgument(maxSize >= 0, "Negative maximum size");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
index 7d787c2..aa20313 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
@@ -59,7 +59,7 @@ public class CheckpointStatsSnapshot implements Serializable {
@Nullable RestoredCheckpointStats latestRestoredCheckpoint) {
this.counts = checkNotNull(counts);
- this.summary= checkNotNull(summary);
+ this.summary = checkNotNull(summary);
this.history = checkNotNull(history);
this.latestRestoredCheckpoint = latestRestoredCheckpoint;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index c7d51b6..42f608b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -107,7 +107,7 @@ public interface CompletedCheckpointStore {
* This method returns whether the completed checkpoint store requires checkpoints to be
* externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
* store can exploit (for example by simply pointing the persisted metadata).
- *
+ *
* @return True, if the store requires that checkpoints are externalized before being added, false
* if the store stores the metadata itself.
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java
index 8004b32..4b275e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java
@@ -25,6 +25,9 @@ import javax.annotation.Nullable;
import java.util.Optional;
+/**
+ * A simple ConnectionState listener that remembers the last state.
+ */
public class DefaultLastStateConnectionStateListener implements LastStateConnectionStateListener {
@Nullable
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index 2f596a7..1eb0131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index a5f908d7..33eccf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -37,16 +37,16 @@ public class OperatorState implements CompositeStateHandle {
private static final long serialVersionUID = -4845578005863201810L;
- /** id of the operator */
+ /** The id of the operator. */
private final OperatorID operatorID;
- /** handles to non-partitioned states, subtaskindex -> subtaskstate */
+ /** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
- /** parallelism of the operator when it was checkpointed */
+ /** The parallelism of the operator when it was checkpointed. */
private final int parallelism;
- /** maximum parallelism of the operator when the job was first created */
+ /** The maximum parallelism (for number of keygroups) of the operator when the job was first created. */
private final int maxParallelism;
public OperatorState(OperatorID operatorID, int parallelism, int maxParallelism) {
@@ -86,6 +86,10 @@ public class OperatorState implements CompositeStateHandle {
}
}
+ public Map<Integer, OperatorSubtaskState> getSubtaskStates() {
+ return Collections.unmodifiableMap(operatorSubtaskStates);
+ }
+
public Collection<OperatorSubtaskState> getStates() {
return operatorSubtaskStates.values();
}
@@ -148,10 +152,6 @@ public class OperatorState implements CompositeStateHandle {
return parallelism + 31 * Objects.hash(operatorID, operatorSubtaskStates);
}
- public Map<Integer, OperatorSubtaskState> getSubtaskStates() {
- return Collections.unmodifiableMap(operatorSubtaskStates);
- }
-
@Override
public String toString() {
// KvStates are always null in 1.1. Don't print this as it might
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index c2b8e9a..87ab061 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index 9b21747..323979d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -45,7 +46,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
private static final long serialVersionUID = -973959257699390327L;
/** Tracker callback when the pending checkpoint is finalized or aborted. */
- private transient final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
+ private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
/** The current number of acknowledged subtasks. */
private volatile int currentNumAcknowledgedSubtasks;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
index 52b30a1..0a949ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.commons.lang3.BooleanUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateObject;
+import org.apache.commons.lang3.BooleanUtils;
+
import javax.annotation.Nonnull;
import java.util.ArrayList;
@@ -162,7 +163,6 @@ public class PrioritizedOperatorSubtaskState {
return restored;
}
-
private static <T extends StateObject> StateObjectCollection<T> lastElement(List<StateObjectCollection<T>> list) {
return list.get(list.size() - 1);
}
@@ -174,6 +174,9 @@ public class PrioritizedOperatorSubtaskState {
return EMPTY_NON_RESTORED_INSTANCE;
}
+ /**
+ * A builder for PrioritizedOperatorSubtaskState.
+ */
@Internal
public static class Builder {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index ac7a9d8..8e7b355 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -388,7 +388,7 @@ public class StateAssignmentOperation {
/**
* Collect {@link KeyGroupsStateHandle managedKeyedStateHandles} which have intersection with given
- * {@link KeyGroupRange} from {@link TaskState operatorState}
+ * {@link KeyGroupRange} from {@link TaskState operatorState}.
*
* @param operatorState all state handles of a operator
* @param subtaskKeyGroupRange the KeyGroupRange of a subtask
@@ -423,7 +423,7 @@ public class StateAssignmentOperation {
/**
* Collect {@link KeyGroupsStateHandle rawKeyedStateHandles} which have intersection with given
- * {@link KeyGroupRange} from {@link TaskState operatorState}
+ * {@link KeyGroupRange} from {@link TaskState operatorState}.
*
* @param operatorState all state handles of a operator
* @param subtaskKeyGroupRange the KeyGroupRange of a subtask
@@ -481,8 +481,8 @@ public class StateAssignmentOperation {
* Groups the available set of key groups into key group partitions. A key group partition is
* the set of key groups which is assigned to the same task. Each set of the returned list
* constitutes a key group partition.
- * <p>
- * <b>IMPORTANT</b>: The assignment of key groups to partitions has to be in sync with the
+ *
+ * <p><b>IMPORTANT</b>: The assignment of key groups to partitions has to be in sync with the
* KeyGroupStreamPartitioner.
*
* @param numberKeyGroups Number of available key groups (indexed from 0 to numberKeyGroups - 1)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
index 3076847..9d4c32b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
@@ -191,6 +191,7 @@ public class StateObjectCollection<T extends StateObject> implements Collection<
// Helper methods.
// ------------------------------------------------------------------------
+ @SuppressWarnings("unchecked")
public static <T extends StateObject> StateObjectCollection<T> empty() {
return (StateObjectCollection<T>) EMPTY;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 5aab33a..1407fc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
@@ -90,7 +90,7 @@ public class SubtaskState implements CompositeStateHandle {
}
}
- private static final long getSizeNullSafe(StateObject stateObject) throws Exception {
+ private static long getSizeNullSafe(StateObject stateObject) throws Exception {
return stateObject != null ? stateObject.getStateSize() : 0L;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 0f3bedb..e2c0bf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -33,28 +33,27 @@ import java.util.Objects;
* Simple container class which contains the task state and key-group state handles for the sub
* tasks of a {@link org.apache.flink.runtime.jobgraph.JobVertex}.
*
- * This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together.
+ * <p>This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together.
*
* @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes.
*/
@Deprecated
-@SuppressWarnings("deprecation")
public class TaskState implements CompositeStateHandle {
private static final long serialVersionUID = -4845578005863201810L;
private final JobVertexID jobVertexID;
- /** handles to non-partitioned states, subtaskindex -> subtaskstate */
+ /** handles to non-partitioned states, subtaskindex -> subtaskstate. */
private final Map<Integer, SubtaskState> subtaskStates;
- /** parallelism of the operator when it was checkpointed */
+ /** parallelism of the operator when it was checkpointed. */
private final int parallelism;
- /** maximum parallelism of the operator when the job was first created */
+ /** maximum parallelism of the operator when the job was first created. */
private final int maxParallelism;
- /** length of the operator chain */
+ /** length of the operator chain. */
private final int chainLength;
public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism, int chainLength) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
index 084d7dd..b1e2f99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkArgument;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 069cce5..4311b68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.curator.framework.CuratorFramework;
+
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 10083fc..b4f72b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -169,7 +169,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
- CompletedCheckpoint completedCheckpoint = null;
+ CompletedCheckpoint completedCheckpoint;
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
@@ -245,8 +245,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
- List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
- return checkpoints;
+ return new ArrayList<>(completedCheckpoints);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 3a16900..e9ffbe3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -58,7 +58,7 @@ public class MasterHooks {
*/
public static void reset(
final Collection<MasterTriggerRestoreHook<?>> hooks,
- final Logger log) throws FlinkException {
+ @SuppressWarnings("unused") final Logger log) throws FlinkException {
for (MasterTriggerRestoreHook<?> hook : hooks) {
final String id = hook.getIdentifier();
@@ -76,12 +76,10 @@ public class MasterHooks {
* Closes the master hooks.
*
* @param hooks The hooks to close
- *
- * @throws FlinkException Thrown, if the hooks throw an exception.
*/
public static void close(
final Collection<MasterTriggerRestoreHook<?>> hooks,
- final Logger log) throws FlinkException {
+ final Logger log) {
for (MasterTriggerRestoreHook<?> hook : hooks) {
try {
@@ -320,12 +318,7 @@ public class MasterHooks {
@Nullable
@Override
public CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
- final Executor wrappedExecutor = new Executor() {
- @Override
- public void execute(Runnable command) {
- executor.execute(new WrappedCommand(userClassLoader, command));
- }
- };
+ final Executor wrappedExecutor = command -> executor.execute(new WrappedCommand(userClassLoader, command));
return LambdaUtil.withContextClassLoader(
userClassLoader,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 12e9c5b..25b63b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -29,8 +29,8 @@ import java.util.Map;
*/
public class SavepointSerializers {
- /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format) */
- static boolean FAIL_WHEN_LEGACY_STATE_DETECTED = true;
+ /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
+ static boolean failWhenLegacyStateDetected = true;
private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
@@ -87,6 +87,6 @@ public class SavepointSerializers {
*/
@VisibleForTesting
public static void setFailWhenLegacyStateDetected(boolean fail) {
- FAIL_WHEN_LEGACY_STATE_DETECTED = fail;
+ failWhenLegacyStateDetected = fail;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
index daf5b7f..69e7695 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -35,10 +35,10 @@ public class SavepointV1 implements Savepoint {
/** The savepoint version. */
public static final int VERSION = 1;
- /** The checkpoint ID */
+ /** The checkpoint ID. */
private final long checkpointId;
- /** The task states */
+ /** The task states. */
private final Collection<TaskState> taskStates;
public SavepointV1(long checkpointId, Collection<TaskState> taskStates) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index b3e6e89..e56d4be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -25,11 +25,11 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -53,6 +53,7 @@ import java.util.Map;
* classes to stay the same.
*/
@Internal
+@SuppressWarnings("deprecation")
public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
private static final byte NULL_HANDLE = 0;
@@ -164,11 +165,11 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
// Duration field has been removed from SubtaskState
- long ignoredDuration = dis.readLong();
+ dis.readLong();
int len = dis.readInt();
- if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
+ if (SavepointSerializers.failWhenLegacyStateDetected) {
Preconditions.checkState(len == 0,
"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
"no longer supported starting from Flink 1.4. Please rewrite your job to use " +
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 466917b..6dc628d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
@@ -48,27 +48,27 @@ public class SavepointV2 implements Savepoint {
/** The savepoint version. */
public static final int VERSION = 2;
- /** The checkpoint ID */
+ /** The checkpoint ID. */
private final long checkpointId;
/**
- * The task states
- * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
+ * The task states.
+ * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
*/
@Deprecated
private final Collection<TaskState> taskStates;
- /** The operator states */
+ /** The operator states. */
private final Collection<OperatorState> operatorStates;
- /** The states generated by the CheckpointCoordinator */
+ /** The states generated by the CheckpointCoordinator. */
private final Collection<MasterState> masterStates;
/** @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */
@Deprecated
public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
this(
- checkpointId,
+ checkpointId,
null,
checkNotNull(taskStates, "taskStates"),
Collections.<MasterState>emptyList()
@@ -180,13 +180,13 @@ public class SavepointV2 implements Savepoint {
Preconditions.checkArgument(
jobVertex.getParallelism() == taskState.getParallelism(),
- "Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() +"." +
+ "Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() + "." +
"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
"to the parallelism of stateful operators.");
Preconditions.checkArgument(
operatorIDs.size() == taskState.getChainLength(),
- "Detected change in chain length during migration for task " + jobVertex.getJobVertexId() +". " +
+ "Detected change in chain length during migration for task " + jobVertex.getJobVertexId() + ". " +
"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
"changed by modification of a chain containing a stateful operator.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index fb942f7..593bcf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -26,11 +26,11 @@ import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -54,15 +54,15 @@ import java.util.UUID;
/**
* (De)serializer for checkpoint metadata format version 2.
- *
+ *
* <p>This format version adds
- *
+ *
* <p>Basic checkpoint metadata layout:
* <pre>
* +--------------+---------------+-----------------+
* | checkpointID | master states | operator states |
* +--------------+---------------+-----------------+
- *
+ *
* Master state:
* +--------------+---------------------+---------+------+---------------+
* | magic number | num remaining bytes | version | name | payload bytes |
@@ -73,7 +73,7 @@ import java.util.UUID;
@VisibleForTesting
public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
- /** Random magic number for consistency checks */
+ /** Random magic number for consistency checks. */
private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
private static final byte NULL_HANDLE = 0;
@@ -83,12 +83,12 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
- /** The singleton instance of the serializer */
+ /** The singleton instance of the serializer. */
public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
// ------------------------------------------------------------------------
- /** Singleton, not meant to be instantiated */
+ /** Singleton, not meant to be instantiated. */
private SavepointV2Serializer() {}
// ------------------------------------------------------------------------
@@ -295,7 +295,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
// for compatibility, do not remove
int len = dis.readInt();
- if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
+ if (SavepointSerializers.failWhenLegacyStateDetected) {
Preconditions.checkState(len == 0,
"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
"no longer supported starting from Flink 1.4. Please rewrite your job to use " +
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 7de66ef..05d20dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -131,7 +131,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
@Override
public int read(byte[] b, int off, int len) throws IOException {
- // note that any bounds checking on "byte[] b" happend anyways by the
+ // note that any bounds checking on "byte[] b" happened anyways by the
// System.arraycopy() call below, so we don't add extra checks here
final int bytesLeft = data.length - index;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
index 602390b..abe1ae1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
+
import org.junit.Test;
import java.io.DataInputStream;
@@ -37,7 +38,7 @@ import java.util.Random;
import static org.junit.Assert.assertEquals;
/**
- * Various tests for the version 2 format serializer of a checkpoint.
+ * Various tests for the version 2 format serializer of a checkpoint.
*/
public class SavepointV2SerializerTest {
@@ -65,7 +66,7 @@ public class SavepointV2SerializerTest {
final Collection<OperatorState> operatorStates = Collections.emptyList();
final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
- final Collection<MasterState> masterStates =
+ final Collection<MasterState> masterStates =
CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
testCheckpointSerialization(checkpointId, operatorStates, masterStates);
@@ -83,7 +84,7 @@ public class SavepointV2SerializerTest {
final int numTasks = rnd.nextInt(maxTaskStates) + 1;
final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
- final Collection<OperatorState> taskStates =
+ final Collection<OperatorState> taskStates =
CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks);
final Collection<MasterState> masterStates = Collections.emptyList();
@@ -139,8 +140,7 @@ public class SavepointV2SerializerTest {
assertEquals(masterStates.size(), deserialized.getMasterStates().size());
for (Iterator<MasterState> a = masterStates.iterator(), b = deserialized.getMasterStates().iterator();
- a.hasNext();)
- {
+ a.hasNext();) {
CheckpointTestUtils.assertMasterStateEquality(a.next(), b.next());
}
}
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index cf985a9..87e3319 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -24,12 +24,8 @@ under the License.
<suppressions>
<suppress
- files="(.*)runtime[/\\]checkpoint[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]
- <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
- <suppress
files="(.*)test[/\\](.*)runtime[/\\]checkpoint[/\\](.*)"
- checks="AvoidStarImport|FileLength|UnusedImports|NeedBraces"/>
+ checks="AvoidStarImport|FileLength|UnusedImports|NeedBraces|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|Whites [...]
<suppress
files="(.*)runtime[/\\]client[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]