You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/09 01:44:42 UTC

[3/5] git commit: FALCON-20 Remove dependency on custom InMobi DistCp. Contributed by Sowmya Ramesh

FALCON-20 Remove dependency on custom InMobi DistCp. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d82c01da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d82c01da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d82c01da

Branch: refs/heads/master
Commit: d82c01dafc51bba758f5c7be21e5b43219e12f2c
Parents: b0929b4
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Oct 8 15:22:00 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Oct 8 16:21:37 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 common/src/main/resources/startup.properties    |  2 +-
 hadoop-dependencies/pom.xml                     |  6 +++++
 hadoop-webapp/pom.xml                           |  6 +++++
 .../action/feed/replication-action.xml          |  5 +++-
 pom.xml                                         | 26 ++++++--------------
 replication/pom.xml                             |  9 +++----
 .../falcon/replication/CustomReplicator.java    |  2 ++
 .../falcon/replication/FeedReplicator.java      | 12 ++++-----
 .../replication/FilteredCopyListingTest.java    | 11 +++++++--
 src/conf/startup.properties                     |  2 +-
 11 files changed, 48 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dca9fa9..c5ba06c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-20 Remove dependency on custom InMobi DistCp (Sowmya Ramesh via
+   Venkatesh Seetharam)
+
    FALCON-758 Discontinue support for Oozie-3.x (Peeyush Bishnoi via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 99aa3c0..433c2a8 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -49,7 +49,7 @@
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-core,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms
+*.shared.libs=activemq-core,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/hadoop-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml
index b3e3ebc..7b0aa1d 100644
--- a/hadoop-dependencies/pom.xml
+++ b/hadoop-dependencies/pom.xml
@@ -102,6 +102,12 @@
                     <artifactId>jersey-core</artifactId>
                     <scope>compile</scope>
                 </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                    <scope>compile</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 1228074..236c269 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -87,6 +87,12 @@
                 </dependency>
 
                 <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
                     <groupId>com.sun.jersey</groupId>
                     <artifactId>jersey-client</artifactId>
                     <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
index dffdd92..5d154b4 100644
--- a/oozie/src/main/resources/action/feed/replication-action.xml
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -33,6 +33,10 @@
                 <name>oozie.launcher.mapred.job.priority</name>
                 <value>${jobPriority}</value>
             </property>
+            <property>  <!-- DistCp jars -->
+                <name>oozie.action.sharelib.for.java</name>
+                <value>distcp</value>
+            </property>
         </configuration>
         <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
         <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
@@ -48,7 +52,6 @@
         <arg>${distcpTargetPaths}</arg>
         <arg>-falconFeedStorageType</arg>
         <arg>${falconFeedStorageType}</arg>
-        <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
     </java>
     <ok to="succeeded-post-processing"/>
     <error to="failed-post-processing"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02473b1..ed046a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
         <activemq.version>5.4.3</activemq.version>
         <hive.version>0.11.0</hive.version>
         <hcatalog.version>0.11.0</hcatalog.version>
-        <hadoop-distcp.version>0.11</hadoop-distcp.version>
         <jetty.version>6.1.26</jetty.version>
         <jersey.version>1.9</jersey.version>
         <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
@@ -236,6 +235,13 @@
                             </exclusion>
                         </exclusions>
                     </dependency>
+
+                    <dependency>
+                        <groupId>org.apache.hadoop</groupId>
+                        <artifactId>hadoop-distcp</artifactId>
+                        <version>${hadoop.version}</version>
+                        <scope>provided</scope>
+                    </dependency>
                 </dependencies>
           </dependencyManagement>
         </profile>
@@ -779,24 +785,6 @@
             </dependency>
 
             <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-auth</artifactId>
-                <version>${hadoop.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>org.apache.hadoop.inmobi.tools</groupId>
-                <artifactId>hadoop-distcp</artifactId>
-                <version>${hadoop-distcp.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.apache.hadoop</groupId>
-                        <artifactId>hadoop-core</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-
-            <dependency>
                 <groupId>org.apache.falcon</groupId>
                 <artifactId>falcon-messaging</artifactId>
                 <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 9dba461..47e5f6a 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -42,6 +42,10 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-client</artifactId>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>
@@ -58,11 +62,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop.inmobi.tools</groupId>
-            <artifactId>hadoop-distcp</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
index 164bfb0..d99fe20 100644
--- a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 public class CustomReplicator extends DistCp {
 
     private static final Logger LOG = LoggerFactory.getLogger(CustomReplicator.class);
+    private final DistCpOptions inputOptions;
 
     /**
      * Public Constructor. Creates DistCp object with specified input-parameters.
@@ -47,6 +48,7 @@ public class CustomReplicator extends DistCp {
      */
     public CustomReplicator(Configuration configuration, DistCpOptions inputOptions) throws Exception {
         super(configuration, inputOptions);
+        this.inputOptions = inputOptions;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index b6d2fa3..d927fff 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -20,7 +20,6 @@ package org.apache.falcon.replication;
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -44,6 +43,7 @@ import java.util.regex.Pattern;
 public class FeedReplicator extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(FeedReplicator.class);
+    private static final String IGNORE = "IGNORE";
 
     public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new FeedReplicator(), args);
@@ -62,16 +62,15 @@ public class FeedReplicator extends Configured implements Tool {
         LOG.info("{} found conf ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
         conf.addResource(confPath);
 
-        String falconFeedStorageType = cmd.getOptionValue("falconFeedStorageType").trim();
-        Storage.TYPE feedStorageType = Storage.TYPE.valueOf(falconFeedStorageType);
+        final boolean includePathSet = !IGNORE.equalsIgnoreCase(conf.get("falcon.include.path"));
 
-        DistCp distCp = (feedStorageType == Storage.TYPE.FILESYSTEM)
+        DistCp distCp = (includePathSet)
                 ? new CustomReplicator(conf, options)
                 : new DistCp(conf, options);
         LOG.info("Started DistCp");
         distCp.execute();
 
-        if (feedStorageType == Storage.TYPE.FILESYSTEM) {
+        if (includePathSet) {
             executePostProcessing(options);  // this only applies for FileSystem Storage.
         }
 
@@ -116,8 +115,7 @@ public class FeedReplicator extends Configured implements Tool {
         distcpOptions.setSyncFolder(true);
         distcpOptions.setBlocking(true);
         distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
-        distcpOptions.setMapBandwidthKB(Integer.valueOf(cmd.getOptionValue("mapBandwidthKB")));
-
+        distcpOptions.setMapBandwidth(Integer.valueOf(cmd.getOptionValue("mapBandwidthKB")));
         return distcpOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
index e308866..09e343b 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.testng.Assert;
@@ -117,6 +118,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         new FilteredCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
 
@@ -132,6 +134,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         Configuration configuration = new Configuration();
         configuration.set("falcon.include.path", "*/3/*");
@@ -149,6 +152,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         Configuration configuration = new Configuration();
         configuration.set("falcon.include.path", "*/3/?");
@@ -166,6 +170,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         Configuration configuration = new Configuration();
         configuration.set("falcon.include.path", "*/3/[47]");
@@ -183,6 +188,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         Configuration configuration = new Configuration();
         configuration.set("falcon.include.path", "*/3/40");
@@ -200,6 +206,7 @@ public class FilteredCopyListingTest {
         Path target = new Path(fileSystemPath.toString() + "///tmp/target");
         Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
         DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+        options.setSyncFolder(true);
 
         Configuration configuration = new Configuration();
         configuration.set("falcon.include.path", "*/3/{4,7}");
@@ -212,13 +219,13 @@ public class FilteredCopyListingTest {
         SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.getLocal(new Configuration()),
                 listingPath, new Configuration());
         Text key = new Text();
-        FileStatus value = new FileStatus();
+        FileStatus value = new CopyListingFileStatus();
         Map<String, String> actualValues = new HashMap<String, String>();
         while (reader.next(key, value)) {
             actualValues.put(value.getPath().toString(), key.toString());
         }
 
-        Assert.assertEquals(expected == -1 ? EXPECTED_VALUES.size() : expected, actualValues.size());
+        Assert.assertEquals(actualValues.size(), expected == -1 ? EXPECTED_VALUES.size() : expected);
         for (Map.Entry<String, String> entry : actualValues.entrySet()) {
             Assert.assertEquals(entry.getValue(), EXPECTED_VALUES.get(entry.getKey()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 78466af..2db4b1e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -61,7 +61,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
 
 ##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-core,ant,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms
+*.shared.libs=activemq-core,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=