You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:21 UTC
[16/41] git commit: FALCON-594 Process lineage information for
Retention policies. Contributed by Sowmya Ramesh
FALCON-594 Process lineage information for Retention policies. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d2e5f8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d2e5f8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d2e5f8c9
Branch: refs/heads/FALCON-585
Commit: d2e5f8c9ee8c97a0a6011261f1cbabf2dbf4f309
Parents: d74dc32
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 2 12:31:25 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 2 12:31:25 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 41 +++++++
.../falcon/metadata/MetadataMappingService.java | 4 +-
.../falcon/metadata/RelationshipLabel.java | 5 +-
.../apache/falcon/retention/EvictionHelper.java | 88 +++++++++++++++
.../metadata/MetadataMappingServiceTest.java | 112 ++++++++++++++++---
.../apache/falcon/retention/FeedEvictor.java | 30 +----
.../falcon/cluster/util/EmbeddedCluster.java | 24 ++--
8 files changed, 256 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 075fe7e..7145ff2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-594 Process lineage information for Retention policies
+ (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-325 Process lineage information for Replication policies
(Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 452872e..e7670da 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -20,8 +20,10 @@ package org.apache.falcon.metadata;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.Vertex;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
@@ -32,6 +34,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.retention.EvictionHelper;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -202,6 +206,43 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
}
+ public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
+ String outputFeedNamesArg = context.getOutputFeedNames();
+ if ("NONE".equals(outputFeedNamesArg)) {
+ LOG.info("There are no output feeds for this process, return");
+ return;
+ }
+
+ String logFile = context.getLogFile();
+ if (StringUtils.isEmpty(logFile)){
+ throw new IllegalArgumentException("csv log file path empty");
+ }
+
+ String clusterName = context.getClusterName();
+ String[] paths = EvictionHelper.getInstancePaths(ClusterHelper.getFileSystem(clusterName), new Path(logFile));
+ if (paths == null || paths.length <= 0) {
+ throw new IllegalArgumentException("No instance paths in log file");
+ }
+
+ // For retention there will be only one output feed name
+ String feedName = outputFeedNamesArg;
+ for (String feedInstanceDataPath : paths) {
+ LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+ + feedInstanceDataPath + ", in cluster: " + clusterName);
+ RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+ Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+
+ LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+ if (feedInstanceVertex == null) {
+ throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+ }
+
+ addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
+ RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601());
+ }
+ }
+
private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
WorkflowExecutionContext context, String feedName,
String feedInstanceDataPath) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index ab82ce1..f607e0a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -293,8 +293,8 @@ public class MetadataMappingService
instanceGraphBuilder.addReplicatedInstance(context);
}
- private void onFeedInstanceEvicted(WorkflowExecutionContext context) {
+ private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException {
LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
- // todo - tbd
+ instanceGraphBuilder.addEvictedInstance(context);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index acd764f..5b312da 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -39,7 +39,10 @@ public enum RelationshipLabel {
PIPELINES("pipeline"),
// replication labels
- FEED_CLUSTER_REPLICATED_EDGE("replicated-to");
+ FEED_CLUSTER_REPLICATED_EDGE("replicated-to"),
+
+ // eviction labels
+ FEED_CLUSTER_EVICTED_EDGE("evicted-from");
private final String name;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
new file mode 100644
index 0000000..5d6481c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Helper methods to facilitate eviction.
+ */
+
+public final class EvictionHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
+
+ private static final String INSTANCEPATH_FORMAT = "instancePaths=";
+ public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+ private EvictionHelper() {}
+
+ public static void logInstancePaths(final FileSystem logfs, final Path path,
+ final String data) throws IOException {
+ LOG.info("Writing deleted instances to path {}", path);
+ OutputStream out = logfs.create(path);
+ out.write(INSTANCEPATH_FORMAT.getBytes());
+ out.write(data.getBytes());
+ out.close();
+ debug(logfs, path);
+ }
+
+ public static String[] getInstancePaths(final FileSystem fs,
+ final Path logFile) throws FalconException {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ try {
+ InputStream date = fs.open(logFile);
+ IOUtils.copyBytes(date, writer, 4096, true);
+ } catch (IOException e) {
+ throw new FalconException(e);
+ }
+ String logData = writer.toString();
+ if (StringUtils.isEmpty(logData)) {
+ throw new FalconException("csv file is empty");
+ }
+
+ String[] parts = logData.split(INSTANCEPATH_FORMAT);
+ if (parts.length != 2) {
+ throw new FalconException("Instance path in csv file not in required format: " + logData);
+ }
+
+ // part[0] is instancePaths=
+ return parts[1].split(INSTANCEPATH_SEPARATOR);
+ }
+
+ private static void debug(final FileSystem fs, final Path outPath) throws IOException {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream instance = fs.open(outPath);
+ IOUtils.copyBytes(instance, writer, 4096, true);
+ LOG.debug("Instance Paths copied to {}", outPath);
+ LOG.debug("Written {}", writer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3f3f539..f49ada0 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,6 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.retention.EvictionHelper;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
@@ -46,6 +48,7 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -65,7 +68,8 @@ import java.util.Set;
public class MetadataMappingServiceTest {
public static final String FALCON_USER = "falcon-user";
- private static final String LOGS_DIR = "target/log";
+ private static final String LOGS_DIR = "/falcon/staging/feed/logs";
+ private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -74,6 +78,7 @@ public class MetadataMappingServiceTest {
public static final String COLO_NAME = "west-coast";
public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
+ private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow";
public static final String WORKFLOW_VERSION = "1.0.9";
public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
@@ -86,6 +91,8 @@ public class MetadataMappingServiceTest {
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101";
+ private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
+ "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
@@ -97,6 +104,9 @@ public class MetadataMappingServiceTest {
private List<Feed> inputFeeds = new ArrayList<Feed>();
private List<Feed> outputFeeds = new ArrayList<Feed>();
private Process processEntity;
+ private EmbeddedCluster embeddedCluster;
+ private String hdfsUrl;
+ private static String logFilePath;
@BeforeClass
@@ -253,6 +263,40 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
}
+ @Test
+ public void testLineageForRetention() throws Exception {
+ setupForLineageEviciton();
+ String feedName = "imp-click-join1";
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+ feedName, "IGNORE", "IGNORE", feedName),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+ List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+ List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+ "clicks-feed/2014-01-01T00:00Z");
+ List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+ verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+ String[] paths = EVICTED_OUTPUT_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ for (String feedInstanceDataPath : paths) {
+ verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+ RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
+ }
+
+ // No new vertices added
+ Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+ // +1 = +2 for evicted-from edge from Feed Instance vertex to cluster.
+ // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no
+ // classified-as -> Secure edge.
+ Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
+ }
+
@Test (dependsOnMethods = "testOnAdd")
public void testOnChange() throws Exception {
// shutdown the graph and resurrect for testing
@@ -647,11 +691,20 @@ public class MetadataMappingServiceTest {
}
private void verifyLineageGraph(String feedType) {
+ List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+ List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+ "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+ List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+ "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+ verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+ }
+
+ private void verifyLineageGraph(String feedType, List<String> expectedFeeds,
+ List<String> secureFeeds, List<String> ownedAndSecureFeeds) {
// feeds owned by a user
List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
- List<String> expected = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
- "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
- Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+ Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds));
Graph graph = service.getGraph();
@@ -666,14 +719,10 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(vertexById, feedInstanceVertex);
// feeds classified as secure
- verifyFeedsClassifiedAsSecure(feedType,
- Arrays.asList("impression-feed/2014-01-01T00:00Z",
- "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
+ verifyFeedsClassifiedAsSecure(feedType, secureFeeds);
// feeds owned by a user and classified as secure
- verifyFeedsOwnedByUserAndClassification(feedType, "Financial",
- Arrays.asList("clicks-feed/2014-01-01T00:00Z",
- "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
+ verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds);
}
private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
@@ -739,7 +788,8 @@ public class MetadataMappingServiceTest {
"-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
- "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+ "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
+ (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
};
}
@@ -751,27 +801,30 @@ public class MetadataMappingServiceTest {
clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
+ addFeedsAndProcess(clusterEntity);
+ }
+
+ private void addFeedsAndProcess(Cluster cluster) throws Exception {
// Add input and output feeds
- Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+ Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
"classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
inputFeeds.add(impressionsFeed);
- Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+ Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
"classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
inputFeeds.add(clicksFeed);
- Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
"classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
outputFeeds.add(join1Feed);
- Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+ Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
"classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
"/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
outputFeeds.add(join2Feed);
processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
"classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
WORKFLOW_VERSION);
-
}
private void setupForLineageReplication() throws Exception {
@@ -785,6 +838,33 @@ public class MetadataMappingServiceTest {
addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
}
+ private void setupForLineageEviciton() throws Exception {
+ cleanUp();
+ service.init();
+
+ // Add cluster
+ embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
+ "classification=production");
+ clusterEntity = embeddedCluster.getCluster();
+ configStore.publish(EntityType.CLUSTER, clusterEntity);
+ hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
+
+ addFeedsAndProcess(clusterEntity);
+
+ // GENERATE WF should have run before this to create all instance related vertices
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
+ "imp-click-join1,imp-click-join1", EVICTED_OUTPUT_INSTANCE_PATHS, null, null),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+
+ // Write to csv file
+ String csvData = EVICTED_OUTPUT_INSTANCE_PATHS;
+ logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
+ Path path = new Path(logFilePath);
+ EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
+ }
+
private void cleanUp() throws Exception {
cleanupGraphStore(service.getGraph());
cleanupConfigurationStore(configStore);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 4de7938..114071f 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
@@ -48,10 +47,7 @@ import org.slf4j.LoggerFactory;
import javax.servlet.jsp.el.ELException;
import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.ParseException;
@@ -107,7 +103,7 @@ public class FeedEvictor extends Configured implements Tool {
}
private final Map<VARS, String> map = new TreeMap<VARS, String>();
- private final StringBuffer instancePaths = new StringBuffer("instancePaths=");
+ private final StringBuffer instancePaths = new StringBuffer();
private final StringBuffer buffer = new StringBuffer();
@Override
@@ -129,7 +125,8 @@ public class FeedEvictor extends Configured implements Tool {
Storage storage = FeedHelper.createStorage(feedStorageType, feedPattern);
evict(storage, retentionLimit, timeZone);
- logInstancePaths(new Path(logFile));
+ Path path = new Path(logFile);
+ EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
int len = buffer.length();
if (len > 0) {
@@ -183,7 +180,7 @@ public class FeedEvictor extends Configured implements Tool {
deleteInstance(fs, path, feedBasePath);
Date date = getDate(path, feedPath, dateMask, timeZone);
buffer.append(dateFormat.format(date)).append(',');
- instancePaths.append(path).append(",");
+ instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
}
}
@@ -197,15 +194,6 @@ public class FeedEvictor extends Configured implements Tool {
}
- private void logInstancePaths(Path path) throws IOException {
- LOG.info("Writing deleted instances to path {}", path);
- FileSystem logfs = path.getFileSystem(getConf());
- OutputStream out = logfs.create(path);
- out.write(instancePaths.toString().getBytes());
- out.close();
- debug(logfs, path);
- }
-
private Pair<Date, Date> getDateRange(String period) throws ELException {
Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
Long.class, RESOLVER, RESOLVER);
@@ -330,14 +318,6 @@ public class FeedEvictor extends Configured implements Tool {
deleteParentIfEmpty(fs, path.getParent(), feedBasePath);
}
- private void debug(FileSystem fs, Path outPath) throws IOException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream instance = fs.open(outPath);
- IOUtils.copyBytes(instance, writer, 4096, true);
- LOG.debug("Instance Paths copied to {}", outPath);
- LOG.debug("Written {}", writer);
- }
-
private CommandLine getCommand(String[] args) throws org.apache.commons.cli.ParseException {
Options options = new Options();
@@ -552,7 +532,7 @@ public class FeedEvictor extends Configured implements Tool {
String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
LOG.info("Deleted partition: " + partitionInfo);
buffer.append(partSpec).append(',');
- instancePaths.append(partitionInfo).append(",");
+ instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index 9512fa8..29c2ec4 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -58,11 +58,11 @@ public class EmbeddedCluster {
}
public static EmbeddedCluster newCluster(final String name) throws Exception {
- return createClusterAsUser(name, false);
+ return createClusterAsUser(name, false, null, null);
}
public static EmbeddedCluster newCluster(final String name, boolean global) throws Exception {
- return createClusterAsUser(name, global);
+ return createClusterAsUser(name, global, null, null);
}
public static EmbeddedCluster newCluster(final String name,
@@ -71,18 +71,24 @@ public class EmbeddedCluster {
return hdfsUser.doAs(new PrivilegedExceptionAction<EmbeddedCluster>() {
@Override
public EmbeddedCluster run() throws Exception {
- return createClusterAsUser(name, false);
+ return createClusterAsUser(name, false, null, null);
}
});
}
- private static EmbeddedCluster createClusterAsUser(String name, boolean global) throws IOException {
+ public static EmbeddedCluster newCluster(final String name, boolean global, final String colo,
+ final String tags) throws Exception {
+ return createClusterAsUser(name, global, colo, tags);
+ }
+
+ private static EmbeddedCluster createClusterAsUser(String name, boolean global, String colo,
+ String tags) throws IOException {
EmbeddedCluster cluster = new EmbeddedCluster();
cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
String hdfsUrl = cluster.conf.get("fs.default.name");
LOG.info("Cluster Namenode = {}", hdfsUrl);
- cluster.buildClusterObject(name);
+ cluster.buildClusterObject(name, colo, tags);
return cluster;
}
@@ -90,10 +96,14 @@ public class EmbeddedCluster {
return FileSystem.get(conf);
}
- protected void buildClusterObject(String name) {
+ protected void buildClusterObject(String name, String colo, String tags) {
clusterEntity = new Cluster();
clusterEntity.setName(name);
- clusterEntity.setColo("local");
+ clusterEntity.setColo((colo == null) ? "local" : colo);
+ clusterEntity.setDescription("Embedded cluster: " + name);
+ if (tags != null) {
+ clusterEntity.setTags(tags);
+ }
clusterEntity.setDescription("Embedded cluster: " + name);
Interfaces interfaces = new Interfaces();