You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/09 01:44:42 UTC
[3/5] git commit: FALCON-20 Remove dependency on custom InMobi
DistCp. Contributed by Sowmya Ramesh
FALCON-20 Remove dependency on custom InMobi DistCp. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d82c01da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d82c01da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d82c01da
Branch: refs/heads/master
Commit: d82c01dafc51bba758f5c7be21e5b43219e12f2c
Parents: b0929b4
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Oct 8 15:22:00 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Oct 8 16:21:37 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +++
common/src/main/resources/startup.properties | 2 +-
hadoop-dependencies/pom.xml | 6 +++++
hadoop-webapp/pom.xml | 6 +++++
.../action/feed/replication-action.xml | 5 +++-
pom.xml | 26 ++++++--------------
replication/pom.xml | 9 +++----
.../falcon/replication/CustomReplicator.java | 2 ++
.../falcon/replication/FeedReplicator.java | 12 ++++-----
.../replication/FilteredCopyListingTest.java | 11 +++++++--
src/conf/startup.properties | 2 +-
11 files changed, 48 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dca9fa9..c5ba06c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-20 Remove dependency on custom InMobi DistCp (Sowmya Ramesh via
+ Venkatesh Seetharam)
+
FALCON-758 Discontinue support for Oozie-3.x (Peeyush Bishnoi via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 99aa3c0..433c2a8 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -49,7 +49,7 @@
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-core,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms
+*.shared.libs=activemq-core,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/hadoop-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml
index b3e3ebc..7b0aa1d 100644
--- a/hadoop-dependencies/pom.xml
+++ b/hadoop-dependencies/pom.xml
@@ -102,6 +102,12 @@
<artifactId>jersey-core</artifactId>
<scope>compile</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 1228074..236c269 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -87,6 +87,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<scope>compile</scope>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
index dffdd92..5d154b4 100644
--- a/oozie/src/main/resources/action/feed/replication-action.xml
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -33,6 +33,10 @@
<name>oozie.launcher.mapred.job.priority</name>
<value>${jobPriority}</value>
</property>
+ <property> <!-- DistCp jars -->
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp</value>
+ </property>
</configuration>
<main-class>org.apache.falcon.replication.FeedReplicator</main-class>
<arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
@@ -48,7 +52,6 @@
<arg>${distcpTargetPaths}</arg>
<arg>-falconFeedStorageType</arg>
<arg>${falconFeedStorageType}</arg>
- <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
</java>
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02473b1..ed046a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
<activemq.version>5.4.3</activemq.version>
<hive.version>0.11.0</hive.version>
<hcatalog.version>0.11.0</hcatalog.version>
- <hadoop-distcp.version>0.11</hadoop-distcp.version>
<jetty.version>6.1.26</jetty.version>
<jersey.version>1.9</jersey.version>
<internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
@@ -236,6 +235,13 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>
@@ -779,24 +785,6 @@
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop.inmobi.tools</groupId>
- <artifactId>hadoop-distcp</artifactId>
- <version>${hadoop-distcp.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-messaging</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 9dba461..47e5f6a 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -42,6 +42,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ </dependency>
</dependencies>
</profile>
</profiles>
@@ -58,11 +62,6 @@
</dependency>
<dependency>
- <groupId>org.apache.hadoop.inmobi.tools</groupId>
- <artifactId>hadoop-distcp</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
index 164bfb0..d99fe20 100644
--- a/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/CustomReplicator.java
@@ -36,6 +36,7 @@ import java.io.IOException;
public class CustomReplicator extends DistCp {
private static final Logger LOG = LoggerFactory.getLogger(CustomReplicator.class);
+ private final DistCpOptions inputOptions;
/**
* Public Constructor. Creates DistCp object with specified input-parameters.
@@ -47,6 +48,7 @@ public class CustomReplicator extends DistCp {
*/
public CustomReplicator(Configuration configuration, DistCpOptions inputOptions) throws Exception {
super(configuration, inputOptions);
+ this.inputOptions = inputOptions;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index b6d2fa3..d927fff 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -20,7 +20,6 @@ package org.apache.falcon.replication;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.Storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -44,6 +43,7 @@ import java.util.regex.Pattern;
public class FeedReplicator extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(FeedReplicator.class);
+ private static final String IGNORE = "IGNORE";
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new FeedReplicator(), args);
@@ -62,16 +62,15 @@ public class FeedReplicator extends Configured implements Tool {
LOG.info("{} found conf ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
conf.addResource(confPath);
- String falconFeedStorageType = cmd.getOptionValue("falconFeedStorageType").trim();
- Storage.TYPE feedStorageType = Storage.TYPE.valueOf(falconFeedStorageType);
+ final boolean includePathSet = !IGNORE.equalsIgnoreCase(conf.get("falcon.include.path"));
- DistCp distCp = (feedStorageType == Storage.TYPE.FILESYSTEM)
+ DistCp distCp = (includePathSet)
? new CustomReplicator(conf, options)
: new DistCp(conf, options);
LOG.info("Started DistCp");
distCp.execute();
- if (feedStorageType == Storage.TYPE.FILESYSTEM) {
+ if (includePathSet) {
executePostProcessing(options); // this only applies for FileSystem Storage.
}
@@ -116,8 +115,7 @@ public class FeedReplicator extends Configured implements Tool {
distcpOptions.setSyncFolder(true);
distcpOptions.setBlocking(true);
distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
- distcpOptions.setMapBandwidthKB(Integer.valueOf(cmd.getOptionValue("mapBandwidthKB")));
-
+ distcpOptions.setMapBandwidth(Integer.valueOf(cmd.getOptionValue("mapBandwidthKB")));
return distcpOptions;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
index e308866..09e343b 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FilteredCopyListingTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.testng.Assert;
@@ -117,6 +118,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
new FilteredCopyListing(new Configuration(), CREDENTIALS).buildListing(listingPath, options);
@@ -132,6 +134,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
Configuration configuration = new Configuration();
configuration.set("falcon.include.path", "*/3/*");
@@ -149,6 +152,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
Configuration configuration = new Configuration();
configuration.set("falcon.include.path", "*/3/?");
@@ -166,6 +170,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
Configuration configuration = new Configuration();
configuration.set("falcon.include.path", "*/3/[47]");
@@ -183,6 +188,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
Configuration configuration = new Configuration();
configuration.set("falcon.include.path", "*/3/40");
@@ -200,6 +206,7 @@ public class FilteredCopyListingTest {
Path target = new Path(fileSystemPath.toString() + "///tmp/target");
Path listingPath = new Path(fileSystemPath.toString() + "///tmp/META/fileList.seq");
DistCpOptions options = new DistCpOptions(Arrays.asList(source), target);
+ options.setSyncFolder(true);
Configuration configuration = new Configuration();
configuration.set("falcon.include.path", "*/3/{4,7}");
@@ -212,13 +219,13 @@ public class FilteredCopyListingTest {
SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.getLocal(new Configuration()),
listingPath, new Configuration());
Text key = new Text();
- FileStatus value = new FileStatus();
+ FileStatus value = new CopyListingFileStatus();
Map<String, String> actualValues = new HashMap<String, String>();
while (reader.next(key, value)) {
actualValues.put(value.getPath().toString(), key.toString());
}
- Assert.assertEquals(expected == -1 ? EXPECTED_VALUES.size() : expected, actualValues.size());
+ Assert.assertEquals(actualValues.size(), expected == -1 ? EXPECTED_VALUES.size() : expected);
for (Map.Entry<String, String> entry : actualValues.entrySet()) {
Assert.assertEquals(entry.getValue(), EXPECTED_VALUES.get(entry.getKey()));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d82c01da/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 78466af..2db4b1e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -61,7 +61,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-core,ant,geronimo-j2ee-management,hadoop-distcp,jms,json-simple,oozie-client,spring-jms
+*.shared.libs=activemq-core,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=