You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2015/01/11 02:37:16 UTC
falcon git commit: FALCON-237 Falcon feed replication should honour
availability flag. Contributed by Peeyush Bishnoi
Repository: falcon
Updated Branches:
refs/heads/master 27e872240 -> 336408f4c
FALCON-237 Falcon feed replication should honour availability flag. Contributed by Peeyush Bishnoi
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/336408f4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/336408f4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/336408f4
Branch: refs/heads/master
Commit: 336408f4cc53919d4cf764513f6c570d13f4c050
Parents: 27e8722
Author: Srikanth Sundarrajan <sr...@apache.org>
Authored: Sun Jan 11 07:06:43 2015 +0530
Committer: Srikanth Sundarrajan <sr...@apache.org>
Committed: Sun Jan 11 07:06:43 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../feed/FeedReplicationCoordinatorBuilder.java | 7 ++++++
.../feed/FeedRetentionWorkflowBuilder.java | 1 +
.../ProcessExecutionWorkflowBuilder.java | 1 +
.../action/feed/replication-action.xml | 2 ++
.../feed/OozieFeedWorkflowBuilderTest.java | 2 +-
.../falcon/replication/FeedReplicator.java | 25 +++++++++++++++-----
.../falcon/replication/FilteredCopyListing.java | 12 ++++++----
8 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 24967a2..8f6c571 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-237 falcon feed replication should honour availability flag (Peeyush
+ Bishnoi via Srikanth Sundarrajan)
+
FALCON-417 Upgrade Hive and HCatalog to latest stable version. (Peeyush Bishnoi
via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 2963ac9..2451bbe 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -181,6 +181,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
instancePaths = pathsWithPartitions;
propagateFileSystemCopyProperties(pathsWithPartitions, props);
+
+ if (entity.getAvailabilityFlag() == null) {
+ props.put("availabilityFlag", "NA");
+ } else {
+ props.put("availabilityFlag", entity.getAvailabilityFlag());
+ }
} else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
instancePaths = "${coord:dataIn('input')}";
final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
@@ -189,6 +195,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
setupHiveConfiguration(srcCluster, trgCluster, buildPath);
+ props.put("availabilityFlag", "NA");
}
propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index cbe055a..51e081f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -77,6 +77,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
private Properties getWorkflowProperties() {
Properties props = new Properties();
props.setProperty("srcClusterName", "NA");
+ props.setProperty("availabilityFlag", "NA");
return props;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 75faceb..61cc3c2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -111,6 +111,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
private Properties getWorkflowProperties() {
Properties props = new Properties();
props.setProperty("srcClusterName", "NA");
+ props.setProperty("availabilityFlag", "NA");
return props;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
index da40b74..beedd57 100644
--- a/oozie/src/main/resources/action/feed/replication-action.xml
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -52,6 +52,8 @@
<arg>${distcpTargetPaths}</arg>
<arg>-falconFeedStorageType</arg>
<arg>${falconFeedStorageType}</arg>
+ <arg>-availabilityFlag</arg>
+ <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
</java>
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index e5588b4..723f909 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -327,7 +327,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
ACTION replicationActionNode = getAction(workflow, "replication");
JAVA replication = replicationActionNode.getJava();
List<String> args = replication.getArg();
- Assert.assertEquals(args.size(), 13);
+ Assert.assertEquals(args.size(), 15);
HashMap<String, String> props = getCoordProperties(coord);
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 90ac753..9e55ffb 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -68,6 +69,13 @@ public class FeedReplicator extends Configured implements Tool {
final boolean includePathSet = (includePathConf != null)
&& !IGNORE.equalsIgnoreCase(includePathConf);
+ String availabilityFlag = EntityUtil.SUCCEEDED_FILE_NAME;
+ if (cmd.getOptionValue("falconFeedStorageType").equals(Storage.TYPE.FILESYSTEM.name())) {
+ availabilityFlag = cmd.getOptionValue("availabilityFlag").equals("NA")
+ ? availabilityFlag : cmd.getOptionValue("availabilityFlag");
+ }
+
+ conf.set("falcon.feed.availability.flag", availabilityFlag);
DistCp distCp = (includePathSet)
? new CustomReplicator(conf, options)
: new DistCp(conf, options);
@@ -75,7 +83,7 @@ public class FeedReplicator extends Configured implements Tool {
distCp.execute();
if (includePathSet) {
- executePostProcessing(options); // this only applies for FileSystem Storage.
+ executePostProcessing(conf, options); // this only applies for FileSystem Storage.
}
LOG.info("Completed DistCp");
@@ -107,6 +115,10 @@ public class FeedReplicator extends Configured implements Tool {
opt.setRequired(true);
options.addOption(opt);
+ opt = new Option("availabilityFlag", true, "availability flag");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return new GnuParser().parse(options, args);
}
@@ -131,7 +143,7 @@ public class FeedReplicator extends Configured implements Tool {
return listPaths;
}
- private void executePostProcessing(DistCpOptions options) throws IOException, FalconException {
+ private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException {
Path targetPath = options.getTargetPath();
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
targetPath.toUri(), getConf());
@@ -154,15 +166,16 @@ public class FeedReplicator extends Configured implements Tool {
finalOutputPath = targetPath;
}
+ final String availabilityFlag = conf.get("falcon.feed.availability.flag");
FileStatus[] files = fs.globStatus(finalOutputPath);
if (files != null) {
for (FileStatus file : files) {
- fs.create(new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME)).close();
- LOG.info("Created {}", new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME));
+ fs.create(new Path(file.getPath(), availabilityFlag)).close();
+ LOG.info("Created {}", new Path(file.getPath(), availabilityFlag));
}
} else {
- // As distcp is not copying empty directories we are creating _SUCCESS file here
- fs.create(new Path(finalOutputPath, EntityUtil.SUCCEEDED_FILE_NAME)).close();
+ // As distcp is not copying empty directories we are creating availabilityFlag file here
+ fs.create(new Path(finalOutputPath, availabilityFlag)).close();
LOG.info("No files present in path: {}", finalOutputPath);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
index 58c09b4..295de92 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
@@ -18,7 +18,6 @@
package org.apache.falcon.replication;
-import org.apache.falcon.entity.EntityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
@@ -31,9 +30,9 @@ import java.io.IOException;
import java.util.regex.Pattern;
/**
- * An implementation of CopyListing that overrides the default behavior by suppressing file,
- * EntityUtil.SUCCEEDED_FILE_NAME and copies that in the last so downstream apps
- * depending on data availability will work correctly.
+ * An implementation of CopyListing that overrides the default behavior by suppressing file
+ * availabilityFlag and copies that in the last so downstream apps depending on data
+ * availability will work correctly.
*/
public class FilteredCopyListing extends SimpleCopyListing {
private static final Logger LOG = LoggerFactory.getLogger(FilteredCopyListing.class);
@@ -51,10 +50,13 @@ public class FilteredCopyListing extends SimpleCopyListing {
*/
private static final char PAT_SET_CLOSE = ']';
+ private String availabilityFlag;
+
private Pattern regex;
protected FilteredCopyListing(Configuration configuration, Credentials credentials) {
super(configuration, credentials);
+ availabilityFlag = configuration.get("falcon.feed.availability.flag");
try {
regex = getRegEx(configuration.get("falcon.include.path", "").trim());
LOG.info("Inclusion pattern = {}", configuration.get("falcon.include.path"));
@@ -67,7 +69,7 @@ public class FilteredCopyListing extends SimpleCopyListing {
@Override
protected boolean shouldCopy(Path path, DistCpOptions options) {
- if (path.getName().equals(EntityUtil.SUCCEEDED_FILE_NAME)) {
+ if (path.getName().equals(availabilityFlag)) {
return false;
}
return regex == null || regex.matcher(path.toString()).find();