You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2015/01/11 02:37:16 UTC

falcon git commit: FALCON-237 Falcon feed replication should honour availability flag. Contributed by Peeyush Bishnoi

Repository: falcon
Updated Branches:
  refs/heads/master 27e872240 -> 336408f4c


FALCON-237 Falcon feed replication should honour availability flag. Contributed by Peeyush Bishnoi


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/336408f4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/336408f4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/336408f4

Branch: refs/heads/master
Commit: 336408f4cc53919d4cf764513f6c570d13f4c050
Parents: 27e8722
Author: Srikanth Sundarrajan <sr...@apache.org>
Authored: Sun Jan 11 07:06:43 2015 +0530
Committer: Srikanth Sundarrajan <sr...@apache.org>
Committed: Sun Jan 11 07:06:43 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../feed/FeedReplicationCoordinatorBuilder.java |  7 ++++++
 .../feed/FeedRetentionWorkflowBuilder.java      |  1 +
 .../ProcessExecutionWorkflowBuilder.java        |  1 +
 .../action/feed/replication-action.xml          |  2 ++
 .../feed/OozieFeedWorkflowBuilderTest.java      |  2 +-
 .../falcon/replication/FeedReplicator.java      | 25 +++++++++++++++-----
 .../falcon/replication/FilteredCopyListing.java | 12 ++++++----
 8 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 24967a2..8f6c571 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-237 falcon feed replication should honour availability flag (Peeyush
+   Bishnoi via Srikanth Sundarrajan)
+
    FALCON-417 Upgrade Hive and HCatalog to latest stable version. (Peeyush Bishnoi
    via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 2963ac9..2451bbe 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -181,6 +181,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             instancePaths = pathsWithPartitions;
 
             propagateFileSystemCopyProperties(pathsWithPartitions, props);
+
+            if (entity.getAvailabilityFlag() == null) {
+                props.put("availabilityFlag", "NA");
+            } else {
+                props.put("availabilityFlag", entity.getAvailabilityFlag());
+            }
         } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
             instancePaths = "${coord:dataIn('input')}";
             final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
@@ -189,6 +195,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
             propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
             propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
             setupHiveConfiguration(srcCluster, trgCluster, buildPath);
+            props.put("availabilityFlag", "NA");
         }
 
         propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index cbe055a..51e081f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -77,6 +77,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
     private Properties getWorkflowProperties() {
         Properties props = new Properties();
         props.setProperty("srcClusterName", "NA");
+        props.setProperty("availabilityFlag", "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 75faceb..61cc3c2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -111,6 +111,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
     private Properties getWorkflowProperties() {
         Properties props = new Properties();
         props.setProperty("srcClusterName", "NA");
+        props.setProperty("availabilityFlag", "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
index da40b74..beedd57 100644
--- a/oozie/src/main/resources/action/feed/replication-action.xml
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -52,6 +52,8 @@
         <arg>${distcpTargetPaths}</arg>
         <arg>-falconFeedStorageType</arg>
         <arg>${falconFeedStorageType}</arg>
+        <arg>-availabilityFlag</arg>
+        <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
     </java>
     <ok to="succeeded-post-processing"/>
     <error to="failed-post-processing"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index e5588b4..723f909 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -327,7 +327,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         ACTION replicationActionNode = getAction(workflow, "replication");
         JAVA replication = replicationActionNode.getJava();
         List<String> args = replication.getArg();
-        Assert.assertEquals(args.size(), 13);
+        Assert.assertEquals(args.size(), 15);
 
         HashMap<String, String> props = getCoordProperties(coord);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 90ac753..9e55ffb 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -68,6 +69,13 @@ public class FeedReplicator extends Configured implements Tool {
         final boolean includePathSet = (includePathConf != null)
                 && !IGNORE.equalsIgnoreCase(includePathConf);
 
+        String availabilityFlag = EntityUtil.SUCCEEDED_FILE_NAME;
+        if (cmd.getOptionValue("falconFeedStorageType").equals(Storage.TYPE.FILESYSTEM.name())) {
+            availabilityFlag = cmd.getOptionValue("availabilityFlag").equals("NA")
+                    ? availabilityFlag : cmd.getOptionValue("availabilityFlag");
+        }
+
+        conf.set("falcon.feed.availability.flag", availabilityFlag);
         DistCp distCp = (includePathSet)
                 ? new CustomReplicator(conf, options)
                 : new DistCp(conf, options);
@@ -75,7 +83,7 @@ public class FeedReplicator extends Configured implements Tool {
         distCp.execute();
 
         if (includePathSet) {
-            executePostProcessing(options);  // this only applies for FileSystem Storage.
+            executePostProcessing(conf, options);  // this only applies for FileSystem Storage.
         }
 
         LOG.info("Completed DistCp");
@@ -107,6 +115,10 @@ public class FeedReplicator extends Configured implements Tool {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("availabilityFlag", true, "availability flag");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -131,7 +143,7 @@ public class FeedReplicator extends Configured implements Tool {
         return listPaths;
     }
 
-    private void executePostProcessing(DistCpOptions options) throws IOException, FalconException {
+    private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException {
         Path targetPath = options.getTargetPath();
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
                 targetPath.toUri(), getConf());
@@ -154,15 +166,16 @@ public class FeedReplicator extends Configured implements Tool {
             finalOutputPath = targetPath;
         }
 
+        final String availabilityFlag = conf.get("falcon.feed.availability.flag");
         FileStatus[] files = fs.globStatus(finalOutputPath);
         if (files != null) {
             for (FileStatus file : files) {
-                fs.create(new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME)).close();
-                LOG.info("Created {}", new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME));
+                fs.create(new Path(file.getPath(), availabilityFlag)).close();
+                LOG.info("Created {}", new Path(file.getPath(), availabilityFlag));
             }
         } else {
-            // As distcp is not copying empty directories we are creating  _SUCCESS file here
-            fs.create(new Path(finalOutputPath, EntityUtil.SUCCEEDED_FILE_NAME)).close();
+            // As distcp is not copying empty directories we are creating availabilityFlag file here
+            fs.create(new Path(finalOutputPath, availabilityFlag)).close();
             LOG.info("No files present in path: {}", finalOutputPath);
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
index 58c09b4..295de92 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.replication;
 
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
@@ -31,9 +30,9 @@ import java.io.IOException;
 import java.util.regex.Pattern;
 
 /**
- * An implementation of CopyListing that overrides the default behavior by suppressing file,
- * EntityUtil.SUCCEEDED_FILE_NAME and copies that in the last so downstream apps
- * depending on data availability will work correctly.
+ * An implementation of CopyListing that overrides the default behavior by suppressing file
+ * availabilityFlag and copies that in the last so downstream apps depending on data
+ * availability will work correctly.
  */
 public class FilteredCopyListing extends SimpleCopyListing {
     private static final Logger LOG = LoggerFactory.getLogger(FilteredCopyListing.class);
@@ -51,10 +50,13 @@ public class FilteredCopyListing extends SimpleCopyListing {
      */
     private static final char PAT_SET_CLOSE = ']';
 
+    private String availabilityFlag;
+
     private Pattern regex;
 
     protected FilteredCopyListing(Configuration configuration, Credentials credentials) {
         super(configuration, credentials);
+        availabilityFlag = configuration.get("falcon.feed.availability.flag");
         try {
             regex = getRegEx(configuration.get("falcon.include.path", "").trim());
             LOG.info("Inclusion pattern = {}", configuration.get("falcon.include.path"));
@@ -67,7 +69,7 @@ public class FilteredCopyListing extends SimpleCopyListing {
 
     @Override
     protected boolean shouldCopy(Path path, DistCpOptions options) {
-        if (path.getName().equals(EntityUtil.SUCCEEDED_FILE_NAME)) {
+        if (path.getName().equals(availabilityFlag)) {
             return false;
         }
         return regex == null || regex.matcher(path.toString()).find();