You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/09 20:50:35 UTC

[1/3] git commit: FALCON-695 Lineage: "stored-in" edge is added between feed entity and target cluster. Contributed by Sowmya Ramesh

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 5766b74ae -> 2e3eebdff


FALCON-695 Lineage: "stored-in" edge is added between feed entity and target cluster. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e59bef76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e59bef76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e59bef76

Branch: refs/heads/master
Commit: e59bef7626284efec5d0acdb43edfc5806a7d6a4
Parents: 5766b74
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:47:49 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:47:49 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |  3 +++
 .../metadata/EntityRelationshipGraphBuilder.java   | 17 ++++++++++++-----
 2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e59bef76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a3bbc4..757dd99 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-695 Lineage: "stored-in" edge is added between feed entity and
+   target cluster (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-669 Missing optional workflow execution listeners configuration
    results in NPE (Raghav Kumar Gautam via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e59bef76/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 1b7a068..c22bcdb 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -22,6 +22,7 @@ import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
@@ -66,7 +67,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         addGroups(feed.getGroups(), feedVertex);
 
         for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-            addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
+            if (ClusterType.TARGET != feedCluster.getType()) {
+                addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
+            }
         }
     }
 
@@ -264,14 +267,18 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
         // remove edges to old clusters
         for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) {
-            removeEdge(feedEntityVertex, oldCuster.getName(),
-                    RelationshipLabel.FEED_CLUSTER_EDGE.getName());
+            if (ClusterType.TARGET != oldCuster.getType()) {
+                removeEdge(feedEntityVertex, oldCuster.getName(),
+                        RelationshipLabel.FEED_CLUSTER_EDGE.getName());
+            }
         }
 
         // add edges to new clusters
         for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) {
-            addRelationToCluster(feedEntityVertex, newCluster.getName(),
-                    RelationshipLabel.FEED_CLUSTER_EDGE);
+            if (ClusterType.TARGET != newCluster.getType()) {
+                addRelationToCluster(feedEntityVertex, newCluster.getName(),
+                        RelationshipLabel.FEED_CLUSTER_EDGE);
+            }
         }
     }
 


[2/3] git commit: FALCON-694 StringIndexOutOfBoundsException while updating graph DB for replicated instance. Contributed by Sowmya Ramesh

Posted by ve...@apache.org.
FALCON-694 StringIndexOutOfBoundsException while updating graph DB for replicated instance. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e9e849e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e9e849e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e9e849e7

Branch: refs/heads/master
Commit: e9e849e73fd5478c943f1b43daac76f8f8f232b1
Parents: e59bef7
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:49:21 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:49:21 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../InstanceRelationshipGraphBuilder.java       |   5 +-
 .../workflow/WorkflowExecutionContext.java      |  22 +---
 .../metadata/MetadataMappingServiceTest.java    | 131 +++++++++++++++----
 .../feed/FeedReplicationCoordinatorBuilder.java |   4 -
 .../feed/OozieFeedWorkflowBuilderTest.java      |   6 +-
 .../falcon/oozie/process/AbstractTestBase.java  |  13 +-
 .../cluster/util/EntityBuilderTestUtil.java     |  21 ++-
 8 files changed, 130 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 757dd99..4884512 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-694 StringIndexOutOfBoundsException while updating graph DB for
+   replicated instance (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-695 Lineage: "stored-in" edge is added between feed entity and
    target cluster (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 2f9fe8e..4d9fbcf 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -185,16 +185,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         String[] outputFeedNames = context.getOutputFeedNamesList();
         String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
         String targetClusterName = context.getClusterName();
-        String srcClusterName = context.getSrcClusterName();
 
         // For replication there will be only one output feed name
         String feedName = outputFeedNames[0];
         String feedInstanceDataPath = outputFeedInstancePaths[0];
 
         LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + srcClusterName);
+                + feedInstanceDataPath + ", in cluster: " + targetClusterName);
         RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
-        String feedInstanceName = getFeedInstanceName(feedName, srcClusterName,
+        String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index c074484..9c7b395 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,7 +54,6 @@ public class WorkflowExecutionContext {
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
-    public static final String CLUSTER_NAME_SEPARATOR = ",";
 
     /**
      * Workflow execution status.
@@ -161,26 +160,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        return value.split(CLUSTER_NAME_SEPARATOR)[0];
-    }
-
-    public String getSrcClusterName() {
-        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
-        if (EntityOperations.REPLICATE != getOperation()) {
-            return value;
-        }
-
-        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
-        if (parts.length != 2) {
-            throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
-        }
-
-        return parts[1];
+        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
     }
 
     public String getEntityName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 7b73a91..3b9fdba 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
@@ -92,9 +93,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-    private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
-            "jail://global:00/falcon/imp-click-join1/20140101";
-    private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
+    private static final String REPLICATED_INSTANCE = "raw-click";
+    private static final String EVICTED_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
     public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
             "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
@@ -273,24 +273,33 @@ public class MetadataMappingServiceTest {
     public void  testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
-        String feedName = "imp-click-join1";
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-            EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName,
-            REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING);
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
-        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
 
-        verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS, context,
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
 
-        // +3 = cluster, colo, tag
-        Assert.assertEquals(getVerticesCount(service.getGraph()), 26);
-        // +3 = +2 edges for bcp cluster, no user but only to colo and new tag  + 1 for replicated-to edge to target
-        // cluster for each output feed instance
-        Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
+        // +6 [primary, bcp cluster] = cluster, colo, tag,
+        // +4 [input feed] = feed, tag, group, user
+        // +4 [output feed] = 1 feed + 1 tag + 2 groups
+        // +4 [process] = 1 process + 1 tag + 2 pipeline
+        // +3 = 1 process, 1 input, 1 output
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+        // +4 [cluster] = cluster to colo and tag [primary and bcp],
+        // +4 [input feed] = cluster, tag, group, user
+        // +5 [output feed] = cluster + user + Group + 2Tags
+        // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
+        // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
+        // +1 for replicated-to edge to target cluster for each output feed instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 40);
     }
 
     @Test
@@ -313,7 +322,7 @@ public class MetadataMappingServiceTest {
         List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
                 "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
-        String[] paths = EVICTED_OUTPUT_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
         for (String feedInstanceDataPath : paths) {
             verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
                     RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
@@ -481,10 +490,20 @@ public class MetadataMappingServiceTest {
     }
 
     private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
-                              Storage.TYPE storageType, String uriTemplate) throws Exception {
-        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception {
+        return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate);
+    }
+
+    private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups,
+                               Storage.TYPE storageType, String uriTemplate) throws Exception {
+        Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters,
                 tags, groups);
         addStorage(feed, storageType, uriTemplate);
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+            if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
+                feedCluster.setType(ClusterType.TARGET);
+            }
+        }
         configStore.publish(EntityType.FEED, feed);
         return feed;
     }
@@ -524,6 +543,24 @@ public class MetadataMappingServiceTest {
         }
     }
 
+    private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed,
+                                   Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            cluster.setLocations(new Locations());
+            cluster.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            cluster.setTable(table);
+        }
+    }
+
     private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
         Vertex entityVertex = getEntityVertex(entityName, entityType);
         Assert.assertNotNull(entityVertex);
@@ -759,7 +796,7 @@ public class MetadataMappingServiceTest {
                                                             WorkflowExecutionContext context,
                                                             RelationshipLabel edgeLabel) throws Exception {
         String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
-                , context.getSrcClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
+                , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
         Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
 
         Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
@@ -777,7 +814,7 @@ public class MetadataMappingServiceTest {
                                                String falconInputFeeds) {
         String cluster;
         if (EntityOperations.REPLICATE == operation) {
-            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
+            cluster = BCP_CLUSTER_ENTITY_NAME;
         } else {
             cluster = CLUSTER_ENTITY_NAME;
         }
@@ -858,14 +895,58 @@ public class MetadataMappingServiceTest {
     }
 
     private void setupForLineageReplication() throws Exception {
-        setup();
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
+        // Add backup cluster
+        Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+
+        Cluster[] clusters = {clusterEntity, bcpCluster};
+
+        // Add feed
+        Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+                "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+                "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
+        // Add uri template for each cluster
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) {
+            if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
+            } else {
+                addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+                        "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
+            }
+        }
+
+        // update config store
+        try {
+            configStore.initiateUpdate(rawFeed);
+            configStore.update(EntityType.FEED, rawFeed);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+        inputFeeds.add(rawFeed);
+
+        // Add output feed
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-            EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
-            , WorkflowExecutionContext.Type.POST_PROCESSING);
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+                "jail://global:00/falcon/imp-click-join1/20140101",
+                "jail://global:00/falcon/raw-click/primary/20140101",
+                REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
-        // Add backup cluster
-        addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
     }
 
     private void setupForLineageEviciton() throws Exception {
@@ -884,12 +965,12 @@ public class MetadataMappingServiceTest {
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
-                        "imp-click-join1,imp-click-join1", EVICTED_OUTPUT_INSTANCE_PATHS, null, null),
+                        "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         // Write to csv file
-        String csvData = EVICTED_OUTPUT_INSTANCE_PATHS;
+        String csvData = EVICTED_INSTANCE_PATHS;
         logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
         Path path = new Path(logFilePath);
         EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 5697eb6..966f90e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,10 +159,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
-        // Override CLUSTER_NAME property to include both source and target cluster
-        String clusterProperty = trgCluster.getName()
-                + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
-        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
         if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 379cf34..3c49353 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster, srcCluster,
+        verifyEntityProperties(feed, trgCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
 
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
 
-        verifyEntityProperties(aFeed, aCluster, srcCluster,
+        verifyEntityProperties(aFeed, aCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
+        verifyEntityProperties(tableFeed, trgCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 4e260e9..b547c31 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -217,22 +217,11 @@ public class AbstractTestBase {
     protected void verifyEntityProperties(Entity entity, Cluster cluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
-        verifyEntityProperties(entity, cluster, null, operation, props);
-    }
-
-    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
-                                          WorkflowExecutionContext.EntityOperations operation,
-                                          HashMap<String, String> props) throws Exception {
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 entity.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
                 entity.getEntityType().name());
-        if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
-            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
-                    cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
-        } else {
-            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
-        }
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
         Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
index b13ec08..66ceb37 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -74,7 +74,7 @@ public final class EntityBuilderTestUtil {
         return cluster;
     }
 
-    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups) {
+    public static Feed buildFeed(String feedName, Cluster[] clusters, String tags, String groups) {
         Feed feed = new Feed();
         feed.setName(feedName);
         feed.setTags(tags);
@@ -82,12 +82,15 @@ public final class EntityBuilderTestUtil {
         feed.setFrequency(Frequency.fromString("hours(1)"));
 
         org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
+                feedClusters = new org.apache.falcon.entity.v0.feed.Clusters();
+        feed.setClusters(feedClusters);
+
+        for (Cluster cluster : clusters) {
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                    new org.apache.falcon.entity.v0.feed.Cluster();
+            feedCluster.setName(cluster.getName());
+            feedClusters.getClusters().add(feedCluster);
+        }
 
         org.apache.falcon.entity.v0.feed.ACL feedACL = new org.apache.falcon.entity.v0.feed.ACL();
         feedACL.setOwner(USER);
@@ -98,6 +101,10 @@ public final class EntityBuilderTestUtil {
         return feed;
     }
 
+    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups) {
+        return buildFeed(feedName, new Cluster[]{cluster}, tags, groups);
+    }
+
     public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
                                                                            Cluster cluster,
                                                                            String tags) throws Exception {


[3/3] git commit: FALCON-590 Update to ACLs added to process is not handled. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-590 Update to ACLs added to process is not handled. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2e3eebdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2e3eebdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2e3eebdf

Branch: refs/heads/master
Commit: 2e3eebdff8d0446b5444c0b728c871265d70fe56
Parents: e9e849e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:50:21 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:50:21 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../org/apache/falcon/update/UpdateHelper.java  |  5 ++--
 .../apache/falcon/update/UpdateHelperTest.java  | 31 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4884512..7c98113 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-590 Update to ACLs added to process is not handled
+   (Venkatesh Seetharam)
+
    FALCON-694 StringIndexOutOfBoundsException while updating graph DB for
    replicated instance (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index b6e2893..af93180 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -55,11 +55,12 @@ public final class UpdateHelper {
 
     private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff",
                                                              "schema.location", "schema.provider",
-                                                             "ACL.group", "ACL.owner", "ACL.permission", };
+                                                             "group", "owner", "permission", };
     private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts",
                                                                 "lateProcess.policy", "lateProcess.delay",
                                                                 "lateProcess.lateInputs[\\d+].input",
-                                                                "lateProcess.lateInputs[\\d+].workflowPath", };
+                                                                "lateProcess.lateInputs[\\d+].workflowPath",
+                                                                "owner", "group", "permission", };
 
     private UpdateHelper() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 6366aca..71f251b 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -298,6 +298,37 @@ public class UpdateHelperTest extends AbstractTestBase {
         Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
     }
 
+    @Test
+    public void testIsEntityACLUpdated() throws Exception {
+        Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+        String cluster = "testCluster";
+        Feed newFeed = (Feed) oldFeed.copy();
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+
+        Path feedPath = EntityUtil.getNewStagingPath(clusterEntity, oldFeed);
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+        newFeed.getACL().setOwner("new-user");
+        newFeed.getACL().setGroup("new-group");
+        Assert.assertNotEquals(oldFeed.getACL().getOwner(), newFeed.getACL().getOwner());
+        Assert.assertNotEquals(oldFeed.getACL().getGroup(), newFeed.getACL().getGroup());
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+
+        Process oldProcess = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
+        prepare(oldProcess);
+        Process newProcess = (Process) oldProcess.copy();
+        Path procPath = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
+
+        Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+        org.apache.falcon.entity.v0.process.ACL processACL =
+                new org.apache.falcon.entity.v0.process.ACL();
+        processACL.setOwner("owner");
+        processACL.setOwner("group");
+        newProcess.setACL(processACL);
+        Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+    }
+
     private static Location getLocation(Feed feed, LocationType type, String cluster) {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
         if (feedCluster.getLocations() != null) {