You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/02 22:03:05 UTC
[2/2] git commit: FALCON-579 Lineage breaks if feed.xml doesn't have
the date pattern in feed path location. Contributed by Sowmya Ramesh
FALCON-579 Lineage breaks if feed.xml doesn't have the date pattern in feed path location. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0bd9c775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0bd9c775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0bd9c775
Branch: refs/heads/master
Commit: 0bd9c775b6a62aa37f2a32d6d41bbe9e9a982b0e
Parents: d2e5f8c
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 2 13:02:46 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 2 13:02:46 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 32 ++++++----
.../metadata/MetadataMappingServiceTest.java | 63 +++++++++++++++++++-
3 files changed, 86 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7145ff2..ca12d59 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -72,6 +72,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-579 Lineage breaks if feed.xml doesn't have the date pattern in
+ feed path location (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-642 OozieProcessWorkflowBuilderTest test failures. (Shwetha GS)
FALCON-630 late data rerun for process broken in trunk. (Shwetha GS)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index e7670da..2f9fe8e 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -194,7 +194,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ feedInstanceDataPath + ", in cluster: " + srcClusterName);
RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
- String feedInstanceName = getFeedInstanceName(feedName, srcClusterName, feedInstanceDataPath);
+ String feedInstanceName = getFeedInstanceName(feedName, srcClusterName,
+ feedInstanceDataPath, context.getNominalTimeAsISO8601());
Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
@@ -219,7 +220,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
String clusterName = context.getClusterName();
- String[] paths = EvictionHelper.getInstancePaths(ClusterHelper.getFileSystem(clusterName), new Path(logFile));
+ String[] paths = EvictionHelper.getInstancePaths(
+ ClusterHelper.getFileSystem(clusterName), new Path(logFile));
if (paths == null || paths.length <= 0) {
throw new IllegalArgumentException("No instance paths in log file");
}
@@ -230,12 +232,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ feedInstanceDataPath + ", in cluster: " + clusterName);
RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
- String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+ String feedInstanceName = getFeedInstanceName(feedName, clusterName,
+ feedInstanceDataPath, context.getNominalTimeAsISO8601());
Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
- LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+ LOG.info("Vertex exists? name={}, type={}, v={}",
+ feedInstanceName, vertexType, feedInstanceVertex);
if (feedInstanceVertex == null) {
- throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+ throw new IllegalStateException(vertexType
+ + " instance vertex must exist " + feedInstanceName);
}
addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -249,7 +254,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
String clusterName = context.getClusterName();
LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ feedInstanceDataPath + ", in cluster: " + clusterName);
- String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+ String feedInstanceName = getFeedInstanceName(feedName, clusterName,
+ feedInstanceDataPath, context.getNominalTimeAsISO8601());
LOG.info("Adding feed instance: " + feedInstanceName);
Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
context.getTimeStampAsISO8601());
@@ -271,7 +277,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public static String getFeedInstanceName(String feedName, String clusterName,
- String feedInstancePath) throws FalconException {
+ String feedInstancePath,
+ String nominalTime) throws FalconException {
try {
Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
@@ -279,7 +286,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
return storageType == Storage.TYPE.TABLE
? getTableFeedInstanceName(feed, feedInstancePath, storageType)
- : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster);
+ : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster, nominalTime);
} catch (URISyntaxException e) {
throw new FalconException(e);
@@ -294,7 +301,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed,
- Cluster cluster) throws FalconException {
+ Cluster cluster,
+ String nominalTime) throws FalconException {
Storage rawStorage = FeedHelper.createStorage(cluster, feed);
String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
String instance = feedInstancePath;
@@ -304,7 +312,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
instance = instance.replaceFirst(element, "");
}
- return feed.getName() + "/"
- + SchemaHelper.formatDateUTCToISO8601(instance, FEED_INSTANCE_FORMAT);
+ return StringUtils.isEmpty(instance)
+ ? feed.getName() + "/" + nominalTime
+ : feed.getName() + "/"
+ + SchemaHelper.formatDateUTCToISO8601(instance, FEED_INSTANCE_FORMAT);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bd9c775/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index f49ada0..7b73a91 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -85,6 +85,9 @@ public class MetadataMappingServiceTest {
public static final String INPUT_INSTANCE_PATHS =
"jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+ "#jail://global:00/falcon/clicks-feed/2014-01-01";
+ public static final String INPUT_INSTANCE_PATHS_NO_DATE =
+ "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed"
+ + "#jail://global:00/falcon/clicks-feed";
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
@@ -93,6 +96,8 @@ public class MetadataMappingServiceTest {
"jail://global:00/falcon/imp-click-join1/20140101";
private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
+ public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
+ "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
@@ -240,6 +245,31 @@ public class MetadataMappingServiceTest {
}
@Test
+ public void testLineageForNoDateInFeedPath() throws Exception {
+ setupForNoDateInFeedPath();
+
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null,
+ OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+
+ // Verify if instance name has nominal time
+ List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+ List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
+ "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
+ Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+
+ // +5 = 1 process, 2 inputs, 2 outputs
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 22);
+ //+34 = +26 for feed instances + 8 for process instance
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 65);
+ }
+
+ @Test
public void testLineageForReplication() throws Exception {
setupForLineageReplication();
@@ -729,7 +759,7 @@ public class MetadataMappingServiceTest {
WorkflowExecutionContext context,
RelationshipLabel edgeLabel) throws Exception {
String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName
- , context.getSrcClusterName(), feedInstanceDataPath);
+ , context.getSrcClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601());
Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName())
@@ -865,6 +895,37 @@ public class MetadataMappingServiceTest {
EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
}
+ private void setupForNoDateInFeedPath() throws Exception {
+ cleanUp();
+ service.init();
+
+ // Add cluster
+ clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
+ "classification=production");
+
+ // Add input and output feeds
+ Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+ "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/impression-feed");
+ inputFeeds.add(impressionsFeed);
+ Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
+ "/falcon/clicks-feed");
+ inputFeeds.add(clicksFeed);
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1");
+ outputFeeds.add(join1Feed);
+ Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+ "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join2");
+ outputFeeds.add(join2Feed);
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
+
+ }
+
private void cleanUp() throws Exception {
cleanupGraphStore(service.getGraph());
cleanupConfigurationStore(configStore);