You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/17 03:12:58 UTC

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11873: [FLINK-17295] Refactor the ExecutionAttemptID to consist of Execution…

KarmaGYZ commented on a change in pull request #11873:
URL: https://github.com/apache/flink/pull/11873#discussion_r471210685



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
 	private static final long serialVersionUID = -1169683445778281344L;
 
+	private final ExecutionVertexID executionVertexID;
+	private final int attemptNumber;
+
+	/**
+	 * Get a random execution attempt id.
+	 */
 	public ExecutionAttemptID() {
+		this(new ExecutionVertexID(), 0);
 	}
 
-	public ExecutionAttemptID(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
+	public ExecutionAttemptID(ExecutionVertexID executionVertexID, int attemptNumber) {
+		Preconditions.checkState(attemptNumber >= 0);
+		this.executionVertexID = Preconditions.checkNotNull(executionVertexID);
+		this.attemptNumber = attemptNumber;
 	}
 
 	public void writeTo(ByteBuf buf) {
-		buf.writeLong(this.lowerPart);
-		buf.writeLong(this.upperPart);
+		executionVertexID.writeTo(buf);
+		buf.writeInt(this.attemptNumber);
 	}
 
 	public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
-		long lower = buf.readLong();
-		long upper = buf.readLong();
-		return new ExecutionAttemptID(lower, upper);
+		final ExecutionVertexID executionVertexID = ExecutionVertexID.fromByteBuf(buf);
+		final int attemptNumber = buf.readInt();
+		return new ExecutionAttemptID(executionVertexID, attemptNumber);
+	}
+
+	@VisibleForTesting
+	public int getAttemptNumber() {
+		return attemptNumber;
+	}
+
+	@VisibleForTesting
+	public ExecutionVertexID getExecutionVertexID() {
+		return executionVertexID;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		} else if (obj != null && obj.getClass() == getClass()) {
+			ExecutionAttemptID that = (ExecutionAttemptID) obj;
+			return that.executionVertexID.equals(this.executionVertexID)
+				&& that.attemptNumber == this.attemptNumber;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return this.executionVertexID.hashCode() ^ this.attemptNumber;

Review comment:
       Good point!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org