You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/29 21:44:10 UTC
incubator-gobblin git commit: [GOBBLIN-205] Fix bug in pushmode
copyRoute generation Add some docs to help understand and maintain the code
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d8b1a6609 -> b67fd71b1
[GOBBLIN-205] Fix bug in pushmode copyRoute generation
Add some docs to help understand and maintain the code
Closes #2057 from autumnust/replication-push
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b67fd71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b67fd71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b67fd71b
Branch: refs/heads/master
Commit: b67fd71b1ea6e3eb47a5ac16d192067fb0ad8a68
Parents: d8b1a66
Author: Lei Sun <au...@gmail.com>
Authored: Tue Aug 29 14:43:59 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Tue Aug 29 14:43:59 2017 -0700
----------------------------------------------------------------------
.../copy/replication/ConfigBasedMultiDatasets.java | 15 ++++++++++++++-
.../copy/replication/CopyRouteGeneratorBase.java | 7 +++++++
.../copy/replication/DataFlowTopology.java | 10 ++++++++++
.../copy/replication/ReplicationConfiguration.java | 2 ++
4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
index 00374e9..3f9a57f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
@@ -132,14 +132,19 @@ public class ConfigBasedMultiDatasets {
Optional<List<CopyRoute>> copyRoutes = cpGen.getPushRoutes(rc, pushFrom);
if(!copyRoutes.isPresent()) {
log.warn("In Push mode, did not found any copyRoute for dataset with meta data {}", rc.getMetaData());
- return;
+ continue;
}
+ /**
+ * For-Loop responsibility:
+ * For each of the {@link CopyRoute}, generate a {@link ConfigBasedDataset}.
+ */
for(CopyRoute cr: copyRoutes.get()){
if(cr.getCopyTo() instanceof HadoopFsEndPoint){
HadoopFsEndPoint ep = (HadoopFsEndPoint)cr.getCopyTo();
if(ep.getFsURI().toString().equals(pushModeTargetCluster)){
+
// For a candidate dataset, iterate thru. all available blacklist patterns.
ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, cr);
if (blacklistFilteringHelper(configBasedDataset, this.blacklist)){
@@ -164,10 +169,18 @@ public class ConfigBasedMultiDatasets {
// PULL mode
CopyRouteGenerator cpGen = rc.getCopyRouteGenerator();
+ /**
+ * Replicas comes from the 'List' which will normally be set in gobblin.replicas
+ * Basically this is all possible destination for this replication job.
+ */
List<EndPoint> replicas = rc.getReplicas();
for(EndPoint replica: replicas){
// Only pull the data from current execution cluster
if(needGenerateCopyEntity(replica, executionClusterURI)){
+ /*
+ * CopyRoute represent a coypable Dataset to execute, e.g. if you specify source:[war, holdem],
+ * there could be two {@link #ConfigBasedDataset} generated.
+ */
Optional<CopyRoute> copyRoute = cpGen.getPullRoute(rc, replica);
if(copyRoute.isPresent()){
ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, copyRoute.get());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
index d732fcf..da88a32 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/CopyRouteGeneratorBase.java
@@ -45,11 +45,18 @@ public class CopyRouteGeneratorBase implements CopyRouteGenerator {
List<DataFlowTopology.DataFlowPath> paths = topology.getDataFlowPaths();
for (DataFlowTopology.DataFlowPath p : paths) {
+ /**
+ * Routes are list of pairs that generated from config in the format of topology specification.
+ * For example, source:[holdem, war] will end up with
+ * List<(source, holdem), (source, war)>
+ */
List<CopyRoute> routes = p.getCopyRoutes();
+
if (routes.isEmpty()) {
continue;
}
+ // All the routes should has the same copyFrom but different copyTo.
if (routes.get(0).getCopyFrom().equals(copyFrom)) {
return Optional.of(routes);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
index 9c8e15b..fc90044 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/DataFlowTopology.java
@@ -36,6 +36,16 @@ import lombok.AllArgsConstructor;;
*/
@Data
+/**
+ * Some explanation combined with the example in configStore:
+ *
+ * {@link DataFlowTopology} is the whole block specified in,
+ * say {@link ReplicationConfiguration#DEFAULT_DATA_FLOW_TOPOLOGIES_PULLMODE}, normally call this topology.
+ *
+ * {@link #dataFlowPaths} is representing a line like: tarock:[source,holdem]
+ *
+ * From {@link #dataFlowPaths} we can have a list of {@link CopyRoute}.
+ */
public class DataFlowTopology {
private List<DataFlowPath> dataFlowPaths = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b67fd71b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
index 2deafba..b2df7f1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicationConfiguration.java
@@ -343,6 +343,8 @@ public class ReplicationConfiguration {
else {
Set<String> currentCopyTo = new HashSet<>();
for (final Map.Entry<String, EndPoint> valid : validEndPoints.entrySet()) {
+
+ // Only generate copyRoute from the EndPoint that running this job.
if (routesConfig.hasPath(valid.getKey())) {
List<String> copyToStringsRaw = routesConfig.getStringList(valid.getKey());
List<String> copyToStrings = new ArrayList<>();