You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/29 21:44:10 UTC

incubator-gobblin git commit: [GOBBLIN-205] Fix bug in pushmode copyRoute generation Add some docs to help understand and maintain the code

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d8b1a6609 -> b67fd71b1


[GOBBLIN-205] Fix bug in pushmode copyRoute generation
Add some docs to help understand and maintain the code

Closes #2057 from autumnust/replication-push


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b67fd71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b67fd71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b67fd71b

Branch: refs/heads/master
Commit: b67fd71b1ea6e3eb47a5ac16d192067fb0ad8a68
Parents: d8b1a66
Author: Lei Sun <au...@gmail.com>
Authored: Tue Aug 29 14:43:59 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Tue Aug 29 14:43:59 2017 -0700

----------------------------------------------------------------------
 .../copy/replication/ConfigBasedMultiDatasets.java   | 15 ++++++++++++++-
 .../copy/replication/CopyRouteGeneratorBase.java     |  7 +++++++
 .../copy/replication/DataFlowTopology.java           | 10 ++++++++++
 .../copy/replication/ReplicationConfiguration.java   |  2 ++
 4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
index 00374e9..3f9a57f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
@@ -132,14 +132,19 @@ public class ConfigBasedMultiDatasets {
         Optional<List<CopyRoute>> copyRoutes = cpGen.getPushRoutes(rc, pushFrom);
         if(!copyRoutes.isPresent()) {
           log.warn("In Push mode, did not found any copyRoute for dataset with meta data {}", rc.getMetaData());
-          return;
+          continue;
         }
 
+        /**
+         * For-Loop responsibility:
+         * For each of the {@link CopyRoute}, generate a {@link ConfigBasedDataset}.
+         */
         for(CopyRoute cr: copyRoutes.get()){
           if(cr.getCopyTo() instanceof HadoopFsEndPoint){
 
             HadoopFsEndPoint ep = (HadoopFsEndPoint)cr.getCopyTo();
             if(ep.getFsURI().toString().equals(pushModeTargetCluster)){
+
               // For a candidate dataset, iterate thru. all available blacklist patterns.
               ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, cr);
               if (blacklistFilteringHelper(configBasedDataset, this.blacklist)){
@@ -164,10 +169,18 @@ public class ConfigBasedMultiDatasets {
 
     // PULL mode
     CopyRouteGenerator cpGen = rc.getCopyRouteGenerator();
+    /**
+     * Replicas comes from the 'List' which will normally be set in gobblin.replicas
+     * Basically this is all possible destination for this replication job.
+     */
     List<EndPoint> replicas = rc.getReplicas();
     for(EndPoint replica: replicas){
       // Only pull the data from current execution cluster
       if(needGenerateCopyEntity(replica, executionClusterURI)){
+        /*
+        * CopyRoute represent a coypable Dataset to execute, e.g. if you specify source:[war, holdem],
+        * there could be two {@link #ConfigBasedDataset} generated.
+        */
         Optional<CopyRoute> copyRoute = cpGen.getPullRoute(rc, replica);
         if(copyRoute.isPresent()){
           ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, copyRoute.get());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
index d732fcf..da88a32 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
@@ -45,11 +45,18 @@ public class CopyRouteGeneratorBase implements CopyRouteGenerator {
     List<DataFlowTopology.DataFlowPath> paths = topology.getDataFlowPaths();
 
     for (DataFlowTopology.DataFlowPath p : paths) {
+      /**
+       * Routes are list of pairs that generated from config in the format of topology specification.
+       * For example, source:[holdem, war] will end up with
+       * List<(source, holdem), (source, war)>
+       */
       List<CopyRoute> routes = p.getCopyRoutes();
+
       if (routes.isEmpty()) {
         continue;
       }
 
+      // All the routes should has the same copyFrom but different copyTo.
       if (routes.get(0).getCopyFrom().equals(copyFrom)) {
         return Optional.of(routes);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
index 9c8e15b..fc90044 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
@@ -36,6 +36,16 @@ import lombok.AllArgsConstructor;;
  */
 
 @Data
+/**
+ * Some explanation combined with the example in configStore:
+ *
+ * {@link DataFlowTopology} is the whole block specified in,
+ * say {@link ReplicationConfiguration#DEFAULT_DATA_FLOW_TOPOLOGIES_PULLMODE}, normally call this topology.
+ *
+ * {@link #dataFlowPaths} is representing a line like: tarock:[source,holdem]
+ *
+ * From {@link #dataFlowPaths} we can have a list of {@link CopyRoute}.
+ */
 public class DataFlowTopology {
 
   private List<DataFlowPath> dataFlowPaths = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index 2deafba..b2df7f1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -343,6 +343,8 @@ public class ReplicationConfiguration {
       else {
         Set<String> currentCopyTo = new HashSet<>();
         for (final Map.Entry<String, EndPoint> valid : validEndPoints.entrySet()) {
+
+          // Only generate copyRoute from the EndPoint that running this job.
           if (routesConfig.hasPath(valid.getKey())) {
             List<String> copyToStringsRaw = routesConfig.getStringList(valid.getKey());
             List<String> copyToStrings = new ArrayList<>();