You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:57 UTC
[3/4] git commit: FALCON-731 Lineage capture for evicted instance is
broken. Contributed by Sowmya Ramesh
FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/00c6f1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/00c6f1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/00c6f1e5
Branch: refs/heads/master
Commit: 00c6f1e5d3b3a92f7efbf872d330bb88f1df552c
Parents: a94cb7a
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 15:04:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/falcon/entity/ClusterHelper.java | 20 +---
.../InstanceRelationshipGraphBuilder.java | 68 ++++++-----
.../falcon/metadata/MetadataMappingService.java | 1 +
.../falcon/retention/EvictedInstanceSerDe.java | 118 +++++++++++++++++++
.../apache/falcon/retention/EvictionHelper.java | 88 --------------
.../falcon/workflow/WorkflowExecutionArgs.java | 2 +-
.../workflow/WorkflowExecutionContext.java | 2 +-
.../metadata/MetadataMappingServiceTest.java | 90 +++++++-------
.../falcon/messaging/JMSMessageProducer.java | 20 +---
.../apache/falcon/retention/FeedEvictor.java | 8 +-
11 files changed, 216 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 523b218..a2bd724 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-731 Lineage capture for evicted instance is broken
+ (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-724 Build fails as Integration test fails (Balu Vellanki via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index cb3ea08..2689cb7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,13 +18,13 @@
package org.apache.falcon.entity;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
+import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
@@ -40,12 +40,6 @@ public final class ClusterHelper {
private ClusterHelper() {
}
- public static FileSystem getFileSystem(String cluster) throws FalconException {
- Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
- Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
- return HadoopClientFactory.get().createProxiedFileSystem(conf);
- }
-
public static Configuration getConfiguration(Cluster cluster) {
Configuration conf = new Configuration();
@@ -116,10 +110,6 @@ public final class ClusterHelper {
return normalizedPath.substring(0, normalizedPath.length() - 1);
}
- public static String getCompleteLocation(Cluster cluster, String locationKey) {
- return getStorageUrl(cluster) + "/" + getLocation(cluster, locationKey);
- }
-
public static String getLocation(Cluster cluster, String locationKey) {
for (Location loc : cluster.getLocations().getLocations()) {
if (loc.getName().equals(locationKey)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 4d9fbcf..5b5d62c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
@@ -34,12 +33,10 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
@@ -51,6 +48,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
+ private static final String IGNORE = "IGNORE";
// process workflow properties from message
private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
@@ -207,39 +205,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
- String outputFeedNamesArg = context.getOutputFeedNames();
- if ("NONE".equals(outputFeedNamesArg)) {
- LOG.info("There are no output feeds for this process, return");
+ final String outputFeedPaths = context.getOutputFeedInstancePaths();
+ if (IGNORE.equals(outputFeedPaths)) {
+ LOG.info("There were no evicted instances, nothing to record");
return;
}
- String logFile = context.getLogFile();
- if (StringUtils.isEmpty(logFile)){
- throw new IllegalArgumentException("csv log file path empty");
- }
-
+ LOG.info("Recording lineage for evicted instances {}", outputFeedPaths);
+ // For retention there will be only one output feed name
+ String feedName = context.getOutputFeedNames();
+ String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList();
String clusterName = context.getClusterName();
- String[] paths = EvictionHelper.getInstancePaths(
- ClusterHelper.getFileSystem(clusterName), new Path(logFile));
- if (paths == null || paths.length <= 0) {
- throw new IllegalArgumentException("No instance paths in log file");
- }
- // For retention there will be only one output feed name
- String feedName = outputFeedNamesArg;
- for (String feedInstanceDataPath : paths) {
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + clusterName);
- RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ for (String evictedFeedInstancePath : evictedFeedInstancePathList) {
+ LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}",
+ feedName, evictedFeedInstancePath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
- feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+ evictedFeedInstancePath, context.getNominalTimeAsISO8601());
+ Vertex feedInstanceVertex = findVertex(feedInstanceName,
+ RelationshipType.FEED_INSTANCE);
LOG.info("Vertex exists? name={}, type={}, v={}",
- feedInstanceName, vertexType, feedInstanceVertex);
- if (feedInstanceVertex == null) {
- throw new IllegalStateException(vertexType
- + " instance vertex must exist " + feedInstanceName);
+ feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+ if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+ LOG.info("{} instance vertex {} does not exist, add it",
+ RelationshipType.FEED_INSTANCE, feedInstanceName);
+ feedInstanceVertex = addFeedInstance(// add a new instance
+ feedInstanceName, context, feedName, clusterName);
}
addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -251,16 +243,20 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
WorkflowExecutionContext context, String feedName,
String feedInstanceDataPath) throws FalconException {
String clusterName = context.getClusterName();
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + clusterName);
+ LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+ feedInstanceDataPath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- LOG.info("Adding feed instance: " + feedInstanceName);
+ Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+ addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+ }
+
+ private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
+ String feedName, String clusterName) throws FalconException {
+ LOG.info("Adding feed instance {}", feedInstanceName);
Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
context.getTimeStampAsISO8601());
- addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
addInstanceToEntity(feedInstance, feedName,
RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
addInstanceToEntity(feedInstance, clusterName,
@@ -273,6 +269,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
addDataClassification(feed.getTags(), feedInstance);
addGroups(feed.getGroups(), feedInstance);
}
+
+ return feedInstance;
}
public static String getFeedInstanceName(String feedName, String clusterName,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index f607e0a..46f8a61 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -271,6 +271,7 @@ public class MetadataMappingService
case DELETE:
onFeedInstanceEvicted(context);
+ getTransactionalGraph().commit();
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
new file mode 100644
index 0000000..c2f222b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Utility class for serializing and deserializing the evicted instance paths.
+ */
+
+public final class EvictedInstanceSerDe {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
+
+ private static final String INSTANCEPATH_PREFIX = "instancePaths=";
+ private static final String INSTANCES_SEPARATOR = "=";
+ public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+ private EvictedInstanceSerDe() {}
+
+ /**
+ * This method serializes the evicted instances to a file in logs dir for a given feed.
+ * @see org.apache.falcon.retention.FeedEvictor
+ *
+ * *Note:* This is executed with in the map task for evictor action
+ *
+ * @param fileSystem file system handle
+ * @param logFilePath File path to serialize the instances to
+ * @param instances list of instances, comma separated
+ * @throws IOException
+ */
+ public static void serializeEvictedInstancePaths(final FileSystem fileSystem,
+ final Path logFilePath,
+ StringBuffer instances) throws IOException {
+ LOG.info("Writing deleted instances {} to path {}", instances, logFilePath);
+ OutputStream out = null;
+ try {
+ out = fileSystem.create(logFilePath);
+ instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
+ out.write(instances.toString().getBytes());
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ logEvictedInstancePaths(fileSystem, logFilePath);
+ }
+ }
+
+ private static void logEvictedInstancePaths(final FileSystem fs,
+ final Path outPath) throws IOException {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream instance = fs.open(outPath);
+ IOUtils.copyBytes(instance, writer, 4096, true);
+ LOG.debug("Instance Paths copied to {}", outPath);
+ LOG.debug("Written {}", writer);
+ }
+
+ /**
+ * This method deserializes the evicted instances from a log file on hdfs.
+ * @see org.apache.falcon.messaging.JMSMessageProducer
+ * *Note:* This is executed with in the falcon server
+ *
+ * @param fileSystem file system handle
+ * @param logFile File path to serialize the instances to
+ * @return list of instances, comma separated
+ * @throws IOException
+ */
+ public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
+ final Path logFile) throws IOException {
+ try {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream instance = fileSystem.open(logFile);
+ IOUtils.copyBytes(instance, writer, 4096, true);
+ String[] instancePaths = writer.toString().split(INSTANCES_SEPARATOR);
+
+ LOG.info("Deleted feed instance paths file:" + logFile);
+ if (instancePaths.length == 1) {
+ LOG.debug("Returning 0 instance paths for feed ");
+ return new String[0];
+ } else {
+ LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+ return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
+ }
+ } finally {
+ // clean up the serialized state
+ fileSystem.delete(logFile, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
deleted file mode 100644
index 5d6481c..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Helper methods to facilitate eviction.
- */
-
-public final class EvictionHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
- private static final String INSTANCEPATH_FORMAT = "instancePaths=";
- public static final String INSTANCEPATH_SEPARATOR = ",";
-
-
- private EvictionHelper() {}
-
- public static void logInstancePaths(final FileSystem logfs, final Path path,
- final String data) throws IOException {
- LOG.info("Writing deleted instances to path {}", path);
- OutputStream out = logfs.create(path);
- out.write(INSTANCEPATH_FORMAT.getBytes());
- out.write(data.getBytes());
- out.close();
- debug(logfs, path);
- }
-
- public static String[] getInstancePaths(final FileSystem fs,
- final Path logFile) throws FalconException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- try {
- InputStream date = fs.open(logFile);
- IOUtils.copyBytes(date, writer, 4096, true);
- } catch (IOException e) {
- throw new FalconException(e);
- }
- String logData = writer.toString();
- if (StringUtils.isEmpty(logData)) {
- throw new FalconException("csv file is empty");
- }
-
- String[] parts = logData.split(INSTANCEPATH_FORMAT);
- if (parts.length != 2) {
- throw new FalconException("Instance path in csv file not in required format: " + logData);
- }
-
- // part[0] is instancePaths=
- return parts[1].split(INSTANCEPATH_SEPARATOR);
- }
-
- private static void debug(final FileSystem fs, final Path outPath) throws IOException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream instance = fs.open(outPath);
- IOUtils.copyBytes(instance, writer, 4096, true);
- LOG.debug("Instance Paths copied to {}", outPath);
- LOG.debug("Written {}", writer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 514bafe..0a8be64 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -75,7 +75,7 @@ public enum WorkflowExecutionArgs {
BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
// state maintained
- LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
+ LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),
// execution context data recorded
LOG_DIR("logDir", "log dir where lineage can be recorded"),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 04ef037..ef55ba9 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -87,7 +87,7 @@ public class WorkflowExecutionContext {
WorkflowExecutionArgs.RUN_ID,
WorkflowExecutionArgs.STATUS,
WorkflowExecutionArgs.TIMESTAMP,
- WorkflowExecutionArgs.LOG_FILE,
+ WorkflowExecutionArgs.LOG_DIR,
};
private final Map<WorkflowExecutionArgs, String> context;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3b9fdba..895a5f7 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -41,7 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
@@ -49,7 +48,6 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -70,7 +68,6 @@ public class MetadataMappingServiceTest {
public static final String FALCON_USER = "falcon-user";
private static final String LOGS_DIR = "/falcon/staging/feed/logs";
- private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -93,7 +90,8 @@ public class MetadataMappingServiceTest {
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
- private static final String REPLICATED_INSTANCE = "raw-click";
+ private static final String REPLICATED_FEED = "raw-click";
+ private static final String EVICTED_FEED = "imp-click-join1";
private static final String EVICTED_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
@@ -109,10 +107,6 @@ public class MetadataMappingServiceTest {
private List<Feed> inputFeeds = new ArrayList<Feed>();
private List<Feed> outputFeeds = new ArrayList<Feed>();
private Process processEntity;
- private EmbeddedCluster embeddedCluster;
- private String hdfsUrl;
- private static String logFilePath;
-
@BeforeClass
public void setUp() throws Exception {
@@ -121,6 +115,8 @@ public class MetadataMappingServiceTest {
configStore = ConfigurationStore.get();
Services.get().register(new WorkflowJobEndNotificationService());
+ StartupProperties.get().setProperty("falcon.graph.storage.directory",
+ "target/graphdb-" + System.currentTimeMillis());
StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
service = new MetadataMappingService();
service.init();
@@ -258,7 +254,8 @@ public class MetadataMappingServiceTest {
GraphUtils.dump(service.getGraph());
// Verify if instance name has nominal time
- List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+ List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
+ RelationshipType.FEED_INSTANCE.getName());
List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
"imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
@@ -270,20 +267,20 @@ public class MetadataMappingServiceTest {
}
@Test
- public void testLineageForReplication() throws Exception {
+ public void testLineageForReplication() throws Exception {
setupForLineageReplication();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
"jail://global:00/falcon/raw-click/bcp/20140101",
- "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+ "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
debug(service.getGraph());
GraphUtils.dump(service.getGraph());
- verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+ verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
"jail://global:00/falcon/raw-click/bcp/20140101", context,
RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
@@ -303,12 +300,11 @@ public class MetadataMappingServiceTest {
}
@Test
- public void testLineageForRetention() throws Exception {
- setupForLineageEviciton();
- String feedName = "imp-click-join1";
+ public void testLineageForRetention() throws Exception {
+ setupForLineageEviction();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
- feedName, "IGNORE", "IGNORE", feedName),
+ EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
@@ -322,9 +318,9 @@ public class MetadataMappingServiceTest {
List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
"imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
- String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
for (String feedInstanceDataPath : paths) {
- verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+ verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
}
@@ -336,6 +332,27 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
}
+ @Test
+ public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
+ cleanUp();
+ service.init();
+ long beforeVerticesCount = getVerticesCount(service.getGraph());
+ long beforeEdgesCount = getEdgesCount(service.getGraph());
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+ EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+ // No new vertices added
+ Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
+ // No new edges added
+ Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
+ }
+
@Test (dependsOnMethods = "testOnAdd")
public void testOnChange() throws Exception {
// shutdown the graph and resurrect for testing
@@ -673,7 +690,9 @@ public class MetadataMappingServiceTest {
// feeds owned by a user
List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
Assert.assertEquals(feedNamesOwnedByUser,
- Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", "imp-click-join2"));
+ Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
+ "imp-click-join2")
+ );
// feeds classified as secure
verifyFeedsClassifiedAsSecure(feedType.getName(),
@@ -736,7 +755,8 @@ public class MetadataMappingServiceTest {
}
}
}
- Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+ Assert.assertTrue(actual.containsAll(expected),
+ "Actual does not contain expected: " + actual);
}
public long getVerticesCount(final Graph graph) {
@@ -855,8 +875,6 @@ public class MetadataMappingServiceTest {
"-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
- "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
- (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
};
}
@@ -907,7 +925,7 @@ public class MetadataMappingServiceTest {
Cluster[] clusters = {clusterEntity, bcpCluster};
// Add feed
- Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+ Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
"classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
// Add uri template for each cluster
@@ -945,22 +963,12 @@ public class MetadataMappingServiceTest {
EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
"jail://global:00/falcon/imp-click-join1/20140101",
"jail://global:00/falcon/raw-click/primary/20140101",
- REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
+ REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
}
- private void setupForLineageEviciton() throws Exception {
- cleanUp();
- service.init();
-
- // Add cluster
- embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
- "classification=production");
- clusterEntity = embeddedCluster.getCluster();
- configStore.publish(EntityType.CLUSTER, clusterEntity);
- hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
-
- addFeedsAndProcess(clusterEntity);
+ private void setupForLineageEviction() throws Exception {
+ setup();
// GENERATE WF should have run before this to create all instance related vertices
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -968,12 +976,6 @@ public class MetadataMappingServiceTest {
"imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
-
- // Write to csv file
- String csvData = EVICTED_INSTANCE_PATHS;
- logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
- Path path = new Path(logFilePath);
- EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
}
private void setupForNoDateInFeedPath() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 0181e74..a60e951 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,12 +18,12 @@
package org.apache.falcon.messaging;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +35,7 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
@@ -290,19 +288,7 @@ public class JMSMessageProducer {
return new String[0];
}
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream instance = fs.open(logFile);
- IOUtils.copyBytes(instance, writer, 4096, true);
- String[] instancePaths = writer.toString().split("=");
- fs.delete(logFile, true);
- LOG.info("Deleted feed instance paths file:" + logFile);
- if (instancePaths.length == 1) {
- LOG.debug("Returning 0 instance paths for feed ");
- return new String[0];
- } else {
- LOG.debug("Returning instance paths for feed " + instancePaths[1]);
- return instancePaths[1].split(",");
- }
+ return EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, logFile);
}
private Map<String, String> buildMessage(final WorkflowExecutionArgs[] filter) {
@@ -311,6 +297,8 @@ public class JMSMessageProducer {
message.put(arg.getName(), context.getValue(arg));
}
+ // this is NOT useful since the file is deleted after message is sent
+ message.remove(WorkflowExecutionArgs.LOG_FILE.getName());
return message;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 114071f..9589edf 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -126,7 +126,8 @@ public class FeedEvictor extends Configured implements Tool {
evict(storage, retentionLimit, timeZone);
Path path = new Path(logFile);
- EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
+ EvictedInstanceSerDe.serializeEvictedInstancePaths(
+ path.getFileSystem(getConf()), path, instancePaths);
int len = buffer.length();
if (len > 0) {
@@ -180,7 +181,7 @@ public class FeedEvictor extends Configured implements Tool {
deleteInstance(fs, path, feedBasePath);
Date date = getDate(path, feedPath, dateMask, timeZone);
buffer.append(dateFormat.format(date)).append(',');
- instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
@@ -532,7 +533,7 @@ public class FeedEvictor extends Configured implements Tool {
String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
LOG.info("Deleted partition: " + partitionInfo);
buffer.append(partSpec).append(',');
- instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ instancePaths.append(partitionInfo).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
}
@@ -553,5 +554,4 @@ public class FeedEvictor extends Configured implements Tool {
}
}
}
-
}