You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/17 08:55:13 UTC

[GitHub] [flink] zhuzhurk opened a new pull request, #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

zhuzhurk opened a new pull request, #19747:
URL: https://github.com/apache/flink/pull/19747

   ## What is the purpose of the change
   
   A meaningful ExecutionAttemptID can help in many places, including improve log readability and add an easy way to find the corresponding execution vertex. 
   
   
   ## Brief change log
   
     - *Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber(int type)*
     - *Refactor the usages of ExecutionAttemptID to ensure param consistency*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   Manual e2e tests are run against this change.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (**yes** / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875604974


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   The log to print the ExecutionGraphID is added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875505631


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   > It might also make sense to return a more structured representation that actually tells the reader what they are looking at.
   
   Agreed. Actually I'm planning to open a separate JIRA&pr to refine the logs which contains an `ExecutionAttemptID`.
   Currently, an execution is usually represented as "`job vertex name` (`subtaskIndex+1`/`vertex parallelism`) (`attemptId`)", which may be redundant after this refactoring work. I'm planning to change the format to be "`job vertex name` (`short ExecutionGraphID`:`JobVertexID`) (`subtaskIndex+1`/`vertex parallelism`) (`#attemptNumber`)" and avoid directly display the `ExecutionAttemptID`. This displayed `JobVertexID` can also help to distinguish job vertices of the same name, which is common in DataStream jobs (e.g. multiple `Map`).
   
   The logs are spread among multiple classes and needs some further examination. Therefore I'd like to remove the current `ExecutionAttemptID#getLogString()` and do this work in a separate task.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875604268


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   > Can we put the work of introducing EG ID into a separate commit?
   
   Sure. A separate commit "[Introduce ExecutionGraph](https://github.com/apache/flink/pull/19747/commits/cb37f0a552595b35ce4e64e9233eabefc618fcb4)" is added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875518397


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -44,7 +47,12 @@ public final class ResultPartitionID implements Serializable {
 
     @VisibleForTesting
     public ResultPartitionID() {
-        this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
+        this(
+                new IntermediateResultPartitionID(),
+                new ExecutionAttemptID(
+                        new ExecutionGraphID(),
+                        new ExecutionVertexID(new JobVertexID(0, 0), 0),
+                        0));

Review Comment:
   I'm thinking of adding a factory method `ExecutionAttemptID#createDummyExecutionAttemptID()` to explicitly show it is a dummy one to be used in production.
   More details see my other [comment](https://github.com/apache/flink/pull/19747#discussion_r875516319).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
KarmaGYZ commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875506253


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -19,47 +19,96 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts happen in cases of
  * failures and recovery.
  */
 public class ExecutionAttemptID implements java.io.Serializable {
 
     private static final long serialVersionUID = -1169683445778281344L;

Review Comment:
   According to the code style guide[1], we should change it to "1L".
   
   [1] https://flink.apache.org/contributing/code-style-and-quality-java.html 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   Can we put the work of introducing EG ID into a separate commit?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphID.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/** A class for statistically unique execution graph IDs. */
+public class ExecutionGraphID extends AbstractID {
+
+    private static final long serialVersionUID = -1084434764056797648L;

Review Comment:
   Ditto. Should be 1L.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java:
##########
@@ -29,6 +29,13 @@
 
 /** Id identifying {@link ExecutionVertex}. */
 public class ExecutionVertexID implements VertexID {
+

Review Comment:
   `serialVersionUID` is missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r876742900


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -48,11 +47,6 @@ public class ExecutionAttemptID implements java.io.Serializable {
 
     private final int attemptNumber;
 
-    @VisibleForTesting
-    public ExecutionAttemptID() {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r877719574


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -364,6 +373,8 @@ public DefaultExecutionGraph(
         this.resultPartitionsById = new HashMap<>();
 
         this.isDynamic = isDynamic;
+
+        LOG.info("Created execution graph {}.", executionGraphId);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r877718674


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -364,6 +373,8 @@ public DefaultExecutionGraph(
         this.resultPartitionsById = new HashMap<>();
 
         this.isDynamic = isDynamic;
+
+        LOG.info("Created execution graph {}.", executionGraphId);

Review Comment:
   Agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875506238


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1420,6 +1422,13 @@ public void testRequestPartitionState() throws Exception {
         }
     }
 
+    private static ExecutionAttemptID deepCopyExecutionAttemptId(ExecutionAttemptID toCopy)
+            throws IOException, ClassNotFoundException {
+        return InstantiationUtil.deserializeObject(
+                InstantiationUtil.serializeObject(toCopy),
+                ExecutionGraphTestUtils.class.getClassLoader());
+    }
+

Review Comment:
   Good idea! I didn't realized there is a `InstantiationUtils#clone`. Will remove this method and directly use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875493129


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   > logging the EG ID is fine, but this can't be done in isolation. There must be a way to correlate this id with a specific job submission; for example we could log the EG ID when the EG for a given job was created.
   
   Totally agreed. Will add a log when creating the EG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r877716691


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   FLINK-27710 is created to refine the representation of Execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875602647


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java:
##########
@@ -29,6 +29,13 @@
 
 /** Id identifying {@link ExecutionVertex}. */
 public class ExecutionVertexID implements VertexID {
+

Review Comment:
   Added in a hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphID.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+/** A class for statistically unique execution graph IDs. */
+public class ExecutionGraphID extends AbstractID {
+
+    private static final long serialVersionUID = -1084434764056797648L;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875506238


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1420,6 +1422,13 @@ public void testRequestPartitionState() throws Exception {
         }
     }
 
+    private static ExecutionAttemptID deepCopyExecutionAttemptId(ExecutionAttemptID toCopy)
+            throws IOException, ClassNotFoundException {
+        return InstantiationUtil.deserializeObject(
+                InstantiationUtil.serializeObject(toCopy),
+                ExecutionGraphTestUtils.class.getClassLoader());
+    }
+

Review Comment:
   Good idea! I didn't realize there is a `InstantiationUtils#clone`. Will remove this method and directly use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r877718479


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -44,7 +47,12 @@ public final class ResultPartitionID implements Serializable {
 
     @VisibleForTesting
     public ResultPartitionID() {
-        this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
+        this(
+                new IntermediateResultPartitionID(),
+                new ExecutionAttemptID(
+                        new ExecutionGraphID(),
+                        new ExecutionVertexID(new JobVertexID(0, 0), 0),
+                        0));

Review Comment:
   Ok. I've added an `ExecutionAttemptID#randomId()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875516319


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -48,11 +47,6 @@ public class ExecutionAttemptID implements java.io.Serializable {
 
     private final int attemptNumber;
 
-    @VisibleForTesting
-    public ExecutionAttemptID() {

Review Comment:
   My major concern is someone may use it without knowing a mocked `jobVertexId`, `subtaskIndex` and `attemptNumber` is used which may be inconsistent of the fact.
   This method is seldom used in production and only used as a placeholder. How about adding a `ExecutionAttemptID #createDummyExecutionAttemptID()` method to explicitly show that it is a dummy one? It can be used in production.
   For testing cases, I prefer to keep the factory methods in `ExecutionGraphTestUtils` to avoid messing up production code for testing purpose.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875602211


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -19,47 +19,96 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts happen in cases of
  * failures and recovery.
  */
 public class ExecutionAttemptID implements java.io.Serializable {
 
     private static final long serialVersionUID = -1169683445778281344L;

Review Comment:
   Ok. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r874580409


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -428,6 +428,18 @@ public static Execution getExecution(
         return ejv.getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt();
     }
 
+    public static ExecutionAttemptID createExecutionAttemptId() {
+        return createExecutionAttemptId(new JobVertexID(0, 0), 0, 0);
+    }
+
+    public static ExecutionAttemptID createExecutionAttemptId(
+            JobVertexID jobVertexId, int subtaskIndex, int attemptNumber) {
+        return new ExecutionAttemptID(
+                new ExecutionGraphID(),
+                new ExecutionVertexID(jobVertexId, subtaskIndex),
+                attemptNumber);
+    }

Review Comment:
   I'd rather have these factory methods in the ExecutionAttemptID class so that they are available everywhere.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   logging the EG ID is fine, but this can't be done in isolation. There must be a way to correlate this id with a specific job submission; for example we could log the EG ID when the EG for a given job was created.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -44,7 +47,12 @@ public final class ResultPartitionID implements Serializable {
 
     @VisibleForTesting
     public ResultPartitionID() {
-        this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
+        this(
+                new IntermediateResultPartitionID(),
+                new ExecutionAttemptID(
+                        new ExecutionGraphID(),
+                        new ExecutionVertexID(new JobVertexID(0, 0), 0),
+                        0));

Review Comment:
   We could add a convenience constructor to avoid these changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -48,11 +47,6 @@ public class ExecutionAttemptID implements java.io.Serializable {
 
     private final int attemptNumber;
 
-    @VisibleForTesting
-    public ExecutionAttemptID() {

Review Comment:
   I'd prefer to keep this.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1420,6 +1422,13 @@ public void testRequestPartitionState() throws Exception {
         }
     }
 
+    private static ExecutionAttemptID deepCopyExecutionAttemptId(ExecutionAttemptID toCopy)
+            throws IOException, ClassNotFoundException {
+        return InstantiationUtil.deserializeObject(
+                InstantiationUtil.serializeObject(toCopy),
+                ExecutionGraphTestUtils.class.getClassLoader());
+    }
+

Review Comment:
   Why aren't you directly using `InstantiationUtils#clone`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   It might also make sense to return a more structured representation that actually tells the reader what they are looking at.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875517505


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -428,6 +428,18 @@ public static Execution getExecution(
         return ejv.getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt();
     }
 
+    public static ExecutionAttemptID createExecutionAttemptId() {
+        return createExecutionAttemptId(new JobVertexID(0, 0), 0, 0);
+    }
+
+    public static ExecutionAttemptID createExecutionAttemptId(
+            JobVertexID jobVertexId, int subtaskIndex, int attemptNumber) {
+        return new ExecutionAttemptID(
+                new ExecutionGraphID(),
+                new ExecutionVertexID(jobVertexId, subtaskIndex),
+                attemptNumber);
+    }

Review Comment:
   I prefer to keep them as test code to avoid messing up production code for testing purposes. Some other concerns see my other [comment](https://github.com/apache/flink/pull/19747#discussion_r875516319).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r876773731


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -364,6 +373,8 @@ public DefaultExecutionGraph(
         this.resultPartitionsById = new HashMap<>();
 
         this.isDynamic = isDynamic;
+
+        LOG.info("Created execution graph {}.", executionGraphId);

Review Comment:
   This should also log the job id. Let's have 1 line that couples JobID<->ExecutionGraphID, and remove the need to also read previous log messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r876741988


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   > The logs are spread among multiple classes and needs some further examination. Therefore I'd like to remove the current ExecutionAttemptID#getLogString() and do this work in a separate task.
   
   Sure, this sounds fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r876741136


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java:
##########
@@ -44,7 +47,12 @@ public final class ResultPartitionID implements Serializable {
 
     @VisibleForTesting
     public ResultPartitionID() {
-        this(new IntermediateResultPartitionID(), new ExecutionAttemptID());
+        this(
+                new IntermediateResultPartitionID(),
+                new ExecutionAttemptID(
+                        new ExecutionGraphID(),
+                        new ExecutionVertexID(new JobVertexID(0, 0), 0),
+                        0));

Review Comment:
   you could call it `ExecutionAttemptID#randomId()`; it's a bit shorter :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19747:
URL: https://github.com/apache/flink/pull/19747#issuecomment-1128604910

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4a9c8eb9174a5d21e44edadf075c915d662a7f2e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4a9c8eb9174a5d21e44edadf075c915d662a7f2e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4a9c8eb9174a5d21e44edadf075c915d662a7f2e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #19747:
URL: https://github.com/apache/flink/pull/19747#discussion_r875493129


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java:
##########
@@ -68,19 +117,32 @@ public boolean equals(Object obj) {
             return true;
         } else if (obj != null && obj.getClass() == getClass()) {
             ExecutionAttemptID that = (ExecutionAttemptID) obj;
-            return that.executionAttemptId.equals(this.executionAttemptId);
+            return that.executionGraphId.equals(this.executionGraphId)
+                    && that.executionVertexId.equals(this.executionVertexId)
+                    && that.attemptNumber == this.attemptNumber;
         } else {
             return false;
         }
     }
 
     @Override
     public int hashCode() {
-        return executionAttemptId.hashCode();
+        return Objects.hash(executionGraphId, executionVertexId, attemptNumber);
     }
 
     @Override
     public String toString() {
-        return executionAttemptId.toString();
+        return String.format(
+                "%s_%s_%d", executionGraphId.toString(), executionVertexId, attemptNumber);
+    }
+
+    public String getLogString() {
+        if (DefaultExecutionGraph.LOG.isDebugEnabled()) {
+            return toString();
+        } else {
+            return String.format(
+                    "%s_%s_%d",
+                    executionGraphId.toString().substring(0, 4), executionVertexId, attemptNumber);

Review Comment:
   > logging the EG ID is fine, but this can't be done in isolation. There must be a way to correlate this id with a specific job submission; for example we could log the EG ID when the EG for a given job was created.
   
   Totally agree. Will add a log when creating the EG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk merged pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk merged PR #19747:
URL: https://github.com/apache/flink/pull/19747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #19747: [FLINK-17295][runtime] Refactor the ExecutionAttemptID to consist of ExecutionGraphID, ExecutionVertexID and attemptNumber

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #19747:
URL: https://github.com/apache/flink/pull/19747#issuecomment-1132699933

   Thanks for reviewing! @zentol @KarmaGYZ 
   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org