You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/09 20:50:35 UTC
[1/3] git commit: FALCON-695 Lineage: "stored-in" edge is added
between feed entity and target cluster. Contributed by Sowmya Ramesh
Repository: incubator-falcon
Updated Branches:
refs/heads/master 5766b74ae -> 2e3eebdff
FALCON-695 Lineage: "stored-in" edge is added between feed entity and target cluster. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e59bef76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e59bef76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e59bef76
Branch: refs/heads/master
Commit: e59bef7626284efec5d0acdb43edfc5806a7d6a4
Parents: 5766b74
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:47:49 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:47:49 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../metadata/EntityRelationshipGraphBuilder.java | 17 ++++++++++++-----
2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e59bef76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a3bbc4..757dd99 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-695 Lineage: "stored-in" edge is added between feed entity and
+ target cluster (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-669 Missing optional workflow execution listeners configuration
results in NPE (Raghav Kumar Gautam via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e59bef76/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 1b7a068..c22bcdb 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -22,6 +22,7 @@ import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
@@ -66,7 +67,9 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
addGroups(feed.getGroups(), feedVertex);
for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
- addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
+ if (ClusterType.TARGET != feedCluster.getType()) {
+ addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
+ }
}
}
@@ -264,14 +267,18 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
// remove edges to old clusters
for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) {
- removeEdge(feedEntityVertex, oldCuster.getName(),
- RelationshipLabel.FEED_CLUSTER_EDGE.getName());
+ if (ClusterType.TARGET != oldCuster.getType()) {
+ removeEdge(feedEntityVertex, oldCuster.getName(),
+ RelationshipLabel.FEED_CLUSTER_EDGE.getName());
+ }
}
// add edges to new clusters
for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) {
- addRelationToCluster(feedEntityVertex, newCluster.getName(),
- RelationshipLabel.FEED_CLUSTER_EDGE);
+ if (ClusterType.TARGET != newCluster.getType()) {
+ addRelationToCluster(feedEntityVertex, newCluster.getName(),
+ RelationshipLabel.FEED_CLUSTER_EDGE);
+ }
}
}
[2/3] git commit: FALCON-694 StringIndexOutOfBoundsException while
updating graph DB for replicated instance. Contributed by Sowmya Ramesh
Posted by ve...@apache.org.
FALCON-694 StringIndexOutOfBoundsException while updating graph DB for replicated instance. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e9e849e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e9e849e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e9e849e7
Branch: refs/heads/master
Commit: e9e849e73fd5478c943f1b43daac76f8f8f232b1
Parents: e59bef7
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:49:21 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:49:21 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 5 +-
.../workflow/WorkflowExecutionContext.java | 22 +---
.../metadata/MetadataMappingServiceTest.java | 131 +++++++++++++++----
.../feed/FeedReplicationCoordinatorBuilder.java | 4 -
.../feed/OozieFeedWorkflowBuilderTest.java | 6 +-
.../falcon/oozie/process/AbstractTestBase.java | 13 +-
.../cluster/util/EntityBuilderTestUtil.java | 21 ++-
8 files changed, 130 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 757dd99..4884512 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-694 StringIndexOutOfBoundsException while updating graph DB for
+ replicated instance (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-695 Lineage: "stored-in" edge is added between feed entity and
target cluster (Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 2f9fe8e..4d9fbcf 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -185,16 +185,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
String[] outputFeedNames = context.getOutputFeedNamesList();
String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
String targetClusterName = context.getClusterName();
- String srcClusterName = context.getSrcClusterName();
// For replication there will be only one output feed name
String feedName = outputFeedNames[0];
String feedInstanceDataPath = outputFeedInstancePaths[0];
LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + srcClusterName);
+ + feedInstanceDataPath + ", in cluster: " + targetClusterName);
RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
- String feedInstanceName = getFeedInstanceName(feedName, srcClusterName,
+ String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index c074484..9c7b395 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,7 +54,6 @@ public class WorkflowExecutionContext {
public static final String OUTPUT_FEED_SEPARATOR = ",";
public static final String INPUT_FEED_SEPARATOR = "#";
- public static final String CLUSTER_NAME_SEPARATOR = ",";
/**
* Workflow execution status.
@@ -161,26 +160,7 @@ public class WorkflowExecutionContext {
}
public String getClusterName() {
- String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
- if (EntityOperations.REPLICATE != getOperation()) {
- return value;
- }
-
- return value.split(CLUSTER_NAME_SEPARATOR)[0];
- }
-
- public String getSrcClusterName() {
- String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
- if (EntityOperations.REPLICATE != getOperation()) {
- return value;
- }
-
- String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
- if (parts.length != 2) {
- throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
- }
-
- return parts[1];
+ return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
}
public String getEntityName() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 7b73a91..3b9fdba 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
@@ -92,9 +93,8 @@ public class MetadataMappingServiceTest {
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
- private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
- "jail://global:00/falcon/imp-click-join1/20140101";
- private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
+ private static final String REPLICATED_INSTANCE = "raw-click";
+ private static final String EVICTED_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
"jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
@@ -273,24 +273,33 @@ public class MetadataMappingServiceTest {
public void testLineageForReplication() throws Exception {
setupForLineageReplication();
- String feedName = "imp-click-join1";
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, feedName,
- REPLICATED_OUTPUT_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING);
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+ "jail://global:00/falcon/raw-click/bcp/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
debug(service.getGraph());
GraphUtils.dump(service.getGraph());
- verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
- verifyLineageGraphForReplicationOrEviction(feedName, REPLICATED_OUTPUT_INSTANCE_PATHS, context,
+ verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+ "jail://global:00/falcon/raw-click/bcp/20140101", context,
RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
- // +3 = cluster, colo, tag
- Assert.assertEquals(getVerticesCount(service.getGraph()), 26);
- // +3 = +2 edges for bcp cluster, no user but only to colo and new tag + 1 for replicated-to edge to target
- // cluster for each output feed instance
- Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
+ // +6 [primary, bcp cluster] = cluster, colo, tag,
+ // +4 [input feed] = feed, tag, group, user
+ // +4 [output feed] = 1 feed + 1 tag + 2 groups
+ // +4 [process] = 1 process + 1 tag + 2 pipeline
+ // +3 = 1 process, 1 input, 1 output
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 21);
+ // +4 [cluster] = cluster to colo and tag [primary and bcp],
+ // +4 [input feed] = cluster, tag, group, user
+ // +5 [output feed] = cluster + user + Group + 2Tags
+ // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines
+ // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance
+ // +1 for replicated-to edge to target cluster for each output feed instance
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 40);
}
@Test
@@ -313,7 +322,7 @@ public class MetadataMappingServiceTest {
List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
"imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
- String[] paths = EVICTED_OUTPUT_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
for (String feedInstanceDataPath : paths) {
verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
@@ -481,10 +490,20 @@ public class MetadataMappingServiceTest {
}
private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups,
- Storage.TYPE storageType, String uriTemplate) throws Exception {
- Feed feed = EntityBuilderTestUtil.buildFeed(feedName, cluster,
+ Storage.TYPE storageType, String uriTemplate) throws Exception {
+ return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate);
+ }
+
+ private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups,
+ Storage.TYPE storageType, String uriTemplate) throws Exception {
+ Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters,
tags, groups);
addStorage(feed, storageType, uriTemplate);
+ for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+ if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
+ feedCluster.setType(ClusterType.TARGET);
+ }
+ }
configStore.publish(EntityType.FEED, feed);
return feed;
}
@@ -524,6 +543,24 @@ public class MetadataMappingServiceTest {
}
}
+ private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed,
+ Storage.TYPE storageType, String uriTemplate) {
+ if (storageType == Storage.TYPE.FILESYSTEM) {
+ Locations locations = new Locations();
+ feed.setLocations(locations);
+
+ Location location = new Location();
+ location.setType(LocationType.DATA);
+ location.setPath(uriTemplate);
+ cluster.setLocations(new Locations());
+ cluster.getLocations().getLocations().add(location);
+ } else {
+ CatalogTable table = new CatalogTable();
+ table.setUri(uriTemplate);
+ cluster.setTable(table);
+ }
+ }
+
private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
Vertex entityVertex = getEntityVertex(entityName, entityType);
Assert.assertNotNull(entityVertex);
@@ -759,7 +796,7 @@ public class MetadataMappingServiceTest {
WorkflowExecutionContext context,
RelationshipLabel edgeLabel) throws Exception {
String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
- , context.getSrcClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
+ , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
@@ -777,7 +814,7 @@ public class MetadataMappingServiceTest {
String falconInputFeeds) {
String cluster;
if (EntityOperations.REPLICATE == operation) {
- cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
+ cluster = BCP_CLUSTER_ENTITY_NAME;
} else {
cluster = CLUSTER_ENTITY_NAME;
}
@@ -858,14 +895,58 @@ public class MetadataMappingServiceTest {
}
private void setupForLineageReplication() throws Exception {
- setup();
+ cleanUp();
+ service.init();
+
+ // Add cluster
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+ "classification=production");
+ // Add backup cluster
+ Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
+
+ Cluster[] clusters = {clusterEntity, bcpCluster};
+
+ // Add feed
+ Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+ "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
+ // Add uri template for each cluster
+ for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) {
+ if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) {
+ addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+ "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
+ } else {
+ addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM,
+ "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
+ }
+ }
+
+ // update config store
+ try {
+ configStore.initiateUpdate(rawFeed);
+ configStore.update(EntityType.FEED, rawFeed);
+ } finally {
+ configStore.cleanupUpdateInit();
+ }
+ inputFeeds.add(rawFeed);
+
+ // Add output feed
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+ outputFeeds.add(join1Feed);
+
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
+
// GENERATE WF should have run before this to create all instance related vertices
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null)
- , WorkflowExecutionContext.Type.POST_PROCESSING);
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+ "jail://global:00/falcon/imp-click-join1/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101",
+ REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
- // Add backup cluster
- addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
}
private void setupForLineageEviciton() throws Exception {
@@ -884,12 +965,12 @@ public class MetadataMappingServiceTest {
// GENERATE WF should have run before this to create all instance related vertices
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
- "imp-click-join1,imp-click-join1", EVICTED_OUTPUT_INSTANCE_PATHS, null, null),
+ "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
// Write to csv file
- String csvData = EVICTED_OUTPUT_INSTANCE_PATHS;
+ String csvData = EVICTED_INSTANCE_PATHS;
logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
Path path = new Path(logFilePath);
EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 5697eb6..966f90e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,10 +159,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
- // Override CLUSTER_NAME property to include both source and target cluster
- String clusterProperty = trgCluster.getName()
- + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
- props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 379cf34..3c49353 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- verifyEntityProperties(feed, trgCluster, srcCluster,
+ verifyEntityProperties(feed, trgCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
- verifyEntityProperties(aFeed, aCluster, srcCluster,
+ verifyEntityProperties(aFeed, aCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
- verifyEntityProperties(tableFeed, trgCluster, srcCluster,
+ verifyEntityProperties(tableFeed, trgCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index 4e260e9..b547c31 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -217,22 +217,11 @@ public class AbstractTestBase {
protected void verifyEntityProperties(Entity entity, Cluster cluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
- verifyEntityProperties(entity, cluster, null, operation, props);
- }
-
- protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
- WorkflowExecutionContext.EntityOperations operation,
- HashMap<String, String> props) throws Exception {
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
entity.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
entity.getEntityType().name());
- if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
- Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
- cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
- } else {
- Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
- }
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e9e849e7/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
index b13ec08..66ceb37 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EntityBuilderTestUtil.java
@@ -74,7 +74,7 @@ public final class EntityBuilderTestUtil {
return cluster;
}
- public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups) {
+ public static Feed buildFeed(String feedName, Cluster[] clusters, String tags, String groups) {
Feed feed = new Feed();
feed.setName(feedName);
feed.setTags(tags);
@@ -82,12 +82,15 @@ public final class EntityBuilderTestUtil {
feed.setFrequency(Frequency.fromString("hours(1)"));
org.apache.falcon.entity.v0.feed.Clusters
- clusters = new org.apache.falcon.entity.v0.feed.Clusters();
- feed.setClusters(clusters);
- org.apache.falcon.entity.v0.feed.Cluster feedCluster =
- new org.apache.falcon.entity.v0.feed.Cluster();
- feedCluster.setName(cluster.getName());
- clusters.getClusters().add(feedCluster);
+ feedClusters = new org.apache.falcon.entity.v0.feed.Clusters();
+ feed.setClusters(feedClusters);
+
+ for (Cluster cluster : clusters) {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+ new org.apache.falcon.entity.v0.feed.Cluster();
+ feedCluster.setName(cluster.getName());
+ feedClusters.getClusters().add(feedCluster);
+ }
org.apache.falcon.entity.v0.feed.ACL feedACL = new org.apache.falcon.entity.v0.feed.ACL();
feedACL.setOwner(USER);
@@ -98,6 +101,10 @@ public final class EntityBuilderTestUtil {
return feed;
}
+ public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups) {
+ return buildFeed(feedName, new Cluster[]{cluster}, tags, groups);
+ }
+
public static org.apache.falcon.entity.v0.process.Process buildProcess(String processName,
Cluster cluster,
String tags) throws Exception {
[3/3] git commit: FALCON-590 Update to ACLs added to process is not
handled. Contributed by Venkatesh Seetharam
Posted by ve...@apache.org.
FALCON-590 Update to ACLs added to process is not handled. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2e3eebdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2e3eebdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2e3eebdf
Branch: refs/heads/master
Commit: 2e3eebdff8d0446b5444c0b728c871265d70fe56
Parents: e9e849e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 9 11:50:21 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 9 11:50:21 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../org/apache/falcon/update/UpdateHelper.java | 5 ++--
.../apache/falcon/update/UpdateHelperTest.java | 31 ++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4884512..7c98113 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -77,6 +77,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-590 Update to ACLs added to process is not handled
+ (Venkatesh Seetharam)
+
FALCON-694 StringIndexOutOfBoundsException while updating graph DB for
replicated instance (Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index b6e2893..af93180 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -55,11 +55,12 @@ public final class UpdateHelper {
private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff",
"schema.location", "schema.provider",
- "ACL.group", "ACL.owner", "ACL.permission", };
+ "group", "owner", "permission", };
private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts",
"lateProcess.policy", "lateProcess.delay",
"lateProcess.lateInputs[\\d+].input",
- "lateProcess.lateInputs[\\d+].workflowPath", };
+ "lateProcess.lateInputs[\\d+].workflowPath",
+ "owner", "group", "permission", };
private UpdateHelper() {}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e3eebdf/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 6366aca..71f251b 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -298,6 +298,37 @@ public class UpdateHelperTest extends AbstractTestBase {
Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
}
+ @Test
+ public void testIsEntityACLUpdated() throws Exception {
+ Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+ String cluster = "testCluster";
+ Feed newFeed = (Feed) oldFeed.copy();
+ Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+
+ Path feedPath = EntityUtil.getNewStagingPath(clusterEntity, oldFeed);
+ Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+
+ Assert.assertFalse(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+ newFeed.getACL().setOwner("new-user");
+ newFeed.getACL().setGroup("new-group");
+ Assert.assertNotEquals(oldFeed.getACL().getOwner(), newFeed.getACL().getOwner());
+ Assert.assertNotEquals(oldFeed.getACL().getGroup(), newFeed.getACL().getGroup());
+ Assert.assertTrue(UpdateHelper.isEntityUpdated(oldFeed, newFeed, cluster, feedPath));
+
+ Process oldProcess = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
+ prepare(oldProcess);
+ Process newProcess = (Process) oldProcess.copy();
+ Path procPath = EntityUtil.getNewStagingPath(clusterEntity, oldProcess);
+
+ Assert.assertFalse(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+ org.apache.falcon.entity.v0.process.ACL processACL =
+ new org.apache.falcon.entity.v0.process.ACL();
+ processACL.setOwner("owner");
+ processACL.setOwner("group");
+ newProcess.setACL(processACL);
+ Assert.assertTrue(UpdateHelper.isEntityUpdated(oldProcess, newProcess, cluster, procPath));
+ }
+
private static Location getLocation(Feed feed, LocationType type, String cluster) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
if (feedCluster.getLocations() != null) {