You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/09/19 19:23:59 UTC

[GitHub] [incubator-gobblin] sv2000 opened a new pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

sv2000 opened a new pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107


   
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1266
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   Dataset Lineage emission in current Gobblin code is only available for batch mode of execution. For instance, the lineage events are emitted on dataset commit, which is never invoked for long-running tasks. This PR refactors lineage related code so that it is leverageable in streaming mode of execution as well.
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Existing tests. This PR is purely re-factoring existing code to make it accessible in other classes.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492218836



##########
File path: gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
   /**
    * Get the serialized key to partitions info in {@link #state}
    */
-  private static String getPartitionsKey(int branchId) {
+  public static String getPartitionsKey(int branchId) {

Review comment:
       This is needed in LinkedIn's internal publisher. Hence the change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492218926



##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {

Review comment:
       same as above.

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {
+    String destinationKey = getDestinationKey(branchId);
+    if (state.contains(destinationKey)) {
+      state.removeProp(destinationKey);
+    }
+  }
+
+  /**
+   * Group states by lineage event name (i.e the dataset name). Used for de-duping LineageEvents for a given dataset.
+   * @param states
+   * @return a map of {@link WorkUnitState}s keyed by dataset name.
+   */
+  public static Map<String, Collection<WorkUnitState>> aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {

Review comment:
       Same comment as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492198456



##########
File path: gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
   /**
    * Get the serialized key to partitions info in {@link #state}
    */
-  private static String getPartitionsKey(int branchId) {
+  public static String getPartitionsKey(int branchId) {

Review comment:
       why changing to public ?

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {

Review comment:
       Where is this being used? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {
+    String destinationKey = getDestinationKey(branchId);
+    if (state.contains(destinationKey)) {
+      state.removeProp(destinationKey);
+    }
+  }
+
+  /**
+   * Group states by lineage event name (i.e the dataset name). Used for de-duping LineageEvents for a given dataset.
+   * @param states
+   * @return a map of {@link WorkUnitState}s keyed by dataset name.
+   */
+  public static Map<String, Collection<WorkUnitState>> aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {

Review comment:
       Is the signature change necessary ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] asfgit closed pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492198456



##########
File path: gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
   /**
    * Get the serialized key to partitions info in {@link #state}
    */
-  private static String getPartitionsKey(int branchId) {
+  public static String getPartitionsKey(int branchId) {

Review comment:
       why changing to public ?

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {

Review comment:
       Where is this being used? 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {
+    String destinationKey = getDestinationKey(branchId);
+    if (state.contains(destinationKey)) {
+      state.removeProp(destinationKey);
+    }
+  }
+
+  /**
+   * Group states by lineage event name (i.e the dataset name). Used for de-duping LineageEvents for a given dataset.
+   * @param states
+   * @return a map of {@link WorkUnitState}s keyed by dataset name.
+   */
+  public static Map<String, Collection<WorkUnitState>> aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {

Review comment:
       Is the signature change necessary ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492218836



##########
File path: gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
   /**
    * Get the serialized key to partitions info in {@link #state}
    */
-  private static String getPartitionsKey(int branchId) {
+  public static String getPartitionsKey(int branchId) {

Review comment:
       This is needed in LinkedIn's internal publisher. Hence the change. 

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {

Review comment:
       same as above.

##########
File path: gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {
+    String destinationKey = getDestinationKey(branchId);
+    if (state.contains(destinationKey)) {
+      state.removeProp(destinationKey);
+    }
+  }
+
+  /**
+   * Group states by lineage event name (i.e the dataset name). Used for de-duping LineageEvents for a given dataset.
+   * @param states
+   * @return a map of {@link WorkUnitState}s keyed by dataset name.
+   */
+  public static Map<String, Collection<WorkUnitState>> aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {

Review comment:
       Same comment as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on pull request #3107: GOBBLIN-1266: Refactor dataset lineage code to allow Lineage event emission in streaming mode

Posted by GitBox <gi...@apache.org>.
sv2000 commented on pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#issuecomment-695347189


   @zxcware @autumnust @ZihanLi58 Please review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org