You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/07/24 03:12:36 UTC

falcon git commit: FALCON-668 FeedReplicator improvement to include more DistCP options. Contributed by Sowmya Ramesh.

Repository: falcon
Updated Branches:
  refs/heads/master 952a9e6c3 -> cc80a1754


FALCON-668 FeedReplicator improvement to include more DistCP options. Contributed by Sowmya Ramesh.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cc80a175
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cc80a175
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cc80a175

Branch: refs/heads/master
Commit: cc80a17546db614098709d71dfbb2a270bfbf4ac
Parents: 952a9e6
Author: Ajay Yadava <aj...@gmail.com>
Authored: Fri Jul 24 05:23:02 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Jul 24 05:23:59 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/util/ReplicationDistCpOption.java    | 43 +++++++++++
 docs/src/site/twiki/EntitySpecification.twiki   | 19 ++++-
 .../OozieOrchestrationWorkflowBuilder.java      | 15 ++++
 .../feed/FSReplicationWorkflowBuilder.java      |  1 +
 .../feed/HCatReplicationWorkflowBuilder.java    |  1 +
 .../falcon/replication/FeedReplicator.java      | 77 +++++++++++++++++++-
 .../falcon/replication/FeedReplicatorTest.java  | 58 +++++++++++++++
 8 files changed, 212 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c8e31a..45d01b0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-668 FeedReplicator improvement to include more DistCP options(Sowmya Ramesh via Ajay Yadava)
+
     FALCON-1320 Adding equals() and hashCode() method in LineageGraphResult.Edge(Pragya Mittal via Ajay Yadava)
 
     FALCON-1139 Validation issues in Falcon UI(Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
new file mode 100644
index 0000000..a8b99bb
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+/**
+ * enum for DistCp options.
+ */
+public enum ReplicationDistCpOption {
+
+    DISTCP_OPTION_OVERWRITE("overwrite"),
+    DISTCP_OPTION_IGNORE_ERRORS("ignoreErrors"),
+    DISTCP_OPTION_SKIP_CHECKSUM("skipChecksum"),
+    DISTCP_OPTION_REMOVE_DELETED_FILES("removeDeletedFiles"),
+    DISTCP_OPTION_PRESERVE_BLOCK_SIZE("preserveBlockSize"),
+    DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER("preserveReplicationNumber"),
+    DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission");
+
+    private final String name;
+
+    ReplicationDistCpOption(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 0c1fae2..98d6153 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -280,6 +280,13 @@ permission indicates the permission.
         <property name="parallel" value="3"/>
         <property name="maxMaps" value="8"/>
         <property name="mapBandwidth" value="1"/>
+        <property name="overwrite" value="true"/>
+        <property name="ignoreErrors" value="false"/>
+        <property name="skipChecksum" value="false"/>
+        <property name="removeDeletedFiles" value="true"/>
+        <property name="preserveBlockSize" value="true"/>
+        <property name="preserveReplicationNumber" value="true"/>
+        <property name="preservePermission" value="true"/>
         <property name="order" value="LIFO"/>
     </properties>
 </verbatim>
@@ -288,9 +295,15 @@ available to user to specify the Hadoop job queue and priority, the same values
 "timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while
 waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and
 order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY.
-"maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s
-used by each mapper during replication.
- 
+DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents
+the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s
+used by each mapper during replication. "overwrite" represents overwrite destination during replication.
+"ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents
+bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the
+destination but not in source during replication. "preserveBlockSize" represents preserving block size during
+replication. "preserveReplicationNumber" represents preserving replication number during replication.
+"preservePermission" represents preserving permission during
+
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines  the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.  
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index f8220ec..f7193a3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -46,6 +47,7 @@ import org.apache.falcon.oozie.workflow.START;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -160,6 +162,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         action.getError().setTo(fail);
     }
 
+    protected void addAdditionalReplicationProperties(ACTION replicationAction) {
+        List<String> args = replicationAction.getJava().getArg();
+        Properties props = getEntityProperties(entity);
+
+        for (ReplicationDistCpOption distcpOption : ReplicationDistCpOption.values()) {
+            String propertyValue = props.getProperty(distcpOption.getName());
+            if (StringUtils.isNotEmpty(propertyValue)) {
+                args.add("-" + distcpOption.getName());
+                args.add(propertyValue);
+            }
+        }
+    }
+
     protected void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) {
         wf.setName(name);
         wf.setStart(new START());

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 0381e59..b82f4e0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -55,6 +55,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
         addHDFSServersConfig(replication, src, target);
+        addAdditionalReplicationProperties(replication);
         addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 347ddaf..ed86b32 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -69,6 +69,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
 
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+        addAdditionalReplicationProperties(replication);
         addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index b2175b2..a226058 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -27,6 +27,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,6 +128,39 @@ public class FeedReplicator extends Configured implements Tool {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName(), true, "option to force overwrite");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), true, "abort on error");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName(), true, "skip checksums");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), true,
+                "remove deleted files - should there be files in the target directory that"
+                        + "were removed from the source directory");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName(), true,
+                "preserve block size");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName(), true,
+                "preserve replication count");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName(), true,
+                "preserve permissions");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -136,10 +170,51 @@ public class FeedReplicator extends Configured implements Tool {
         String trgPath = cmd.getOptionValue("targetPath").trim();
 
         DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(trgPath));
-        distcpOptions.setSyncFolder(true);
         distcpOptions.setBlocking(true);
         distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
         distcpOptions.setMapBandwidth(Integer.valueOf(cmd.getOptionValue("mapBandwidth")));
+
+        String overwrite = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName());
+        if (StringUtils.isNotEmpty(overwrite) && overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) {
+            distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite));
+        } else {
+            distcpOptions.setSyncFolder(true);
+        }
+
+        String ignoreErrors = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName());
+        if (StringUtils.isNotEmpty(ignoreErrors)) {
+            distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors));
+        }
+
+        String skipChecksum = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName());
+        if (StringUtils.isNotEmpty(skipChecksum)) {
+            distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum));
+        }
+
+        String removeDeletedFiles = cmd.getOptionValue(
+                ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName());
+        if (StringUtils.isNotEmpty(removeDeletedFiles)) {
+            distcpOptions.setDeleteMissing(Boolean.parseBoolean(removeDeletedFiles));
+        }
+
+        String preserveBlockSize = cmd.getOptionValue(
+                ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
+        if (preserveBlockSize != null && Boolean.parseBoolean(preserveBlockSize)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+        }
+
+        String preserveReplicationCount = cmd.getOptionValue(ReplicationDistCpOption
+                .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName());
+        if (preserveReplicationCount != null && Boolean.parseBoolean(preserveReplicationCount)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION);
+        }
+
+        String preservePermission = cmd.getOptionValue(
+                ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName());
+        if (preservePermission != null && Boolean.parseBoolean(preservePermission)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION);
+        }
+
         return distcpOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 539d00d..9cfeb30 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -57,9 +57,67 @@ public class FeedReplicatorTest {
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
+        validateMandatoryArguments(options, srcPaths, true);
+    }
+
+    @Test
+    public void testOptionalArguments() throws Exception {
+        /*
+         * <arg>-update</arg>
+         * <arg>-blocking</arg><arg>true</arg>
+         * <arg>-maxMaps</arg><arg>3</arg>
+         * <arg>-mapBandwidthKB</arg><arg>4</arg>
+         * <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
+         * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
+         * <arg>-overwrite</arg><arg>true</arg>
+         * <arg>-ignoreErrors</arg><arg>false</arg>
+         * <arg>-skipChecksum</arg><arg>false</arg>
+         * <arg>-removeDeletedFiles</arg><arg>true</arg>
+         * <arg>-preserveBlockSize</arg><arg>false</arg>
+         * <arg>-preserveReplicationCount</arg><arg>true</arg>
+         * <arg>-preserveBlockSize</arg><arg>false</arg>
+         */
+        final String[] optionalArgs = {
+            "true",
+            "-maxMaps", "3",
+            "-mapBandwidth", "4",
+            "-sourcePaths", "hdfs://localhost:8020/tmp/",
+            "-targetPath", "hdfs://localhost1:8020/tmp/",
+            "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
+            "-overwrite", "true",
+            "-ignoreErrors", "false",
+            "-skipChecksum", "false",
+            "-removeDeletedFiles", "true",
+            "-preserveBlockSize", "false",
+            "-preserveReplicationNumber", "true",
+            "-preservePermission", "false",
+        };
+
+        FeedReplicator replicator = new FeedReplicator();
+        CommandLine cmd = replicator.getCommand(optionalArgs);
+        DistCpOptions options = replicator.getDistCpOptions(cmd);
+
+        List<Path> srcPaths = new ArrayList<Path>();
+        srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
+        validateMandatoryArguments(options, srcPaths, false);
+        validateOptionalArguments(options);
+    }
+
+    private void validateMandatoryArguments(DistCpOptions options, List<Path> srcPaths, boolean shouldSyncFolder) {
         Assert.assertEquals(options.getMaxMaps(), 3);
         Assert.assertEquals(options.getMapBandwidth(), 4);
         Assert.assertEquals(options.getSourcePaths(), srcPaths);
         Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost1:8020/tmp/"));
+        Assert.assertEquals(options.shouldSyncFolder(), shouldSyncFolder);
+    }
+
+    private void validateOptionalArguments(DistCpOptions options) {
+        Assert.assertTrue(options.shouldOverwrite());
+        Assert.assertFalse(options.shouldIgnoreFailures());
+        Assert.assertFalse(options.shouldSkipCRC());
+        Assert.assertTrue(options.shouldDeleteMissing());
+        Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.BLOCKSIZE));
+        Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.REPLICATION));
+        Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.PERMISSION));
     }
 }