You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/04/24 11:18:57 UTC

git commit: FALCON-391 Add ability to set mapBandwidth. Contributed by Michael Miklavcic

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 946182a09 -> 7fa6ca57e


FALCON-391 Add ability to set mapBandwidth. Contributed by Michael Miklavcic


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7fa6ca57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7fa6ca57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7fa6ca57

Branch: refs/heads/master
Commit: 7fa6ca57e0b1121fc07de999932f5b8546222b1b
Parents: 946182a
Author: Shwetha GS <sh...@gmail.com>
Authored: Thu Apr 24 14:48:50 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Thu Apr 24 14:48:50 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 common/src/main/resources/runtime.properties    |   3 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   4 +-
 .../workflow/OozieFeedWorkflowBuilder.java      | 102 ++++++++++---------
 .../config/workflow/replication-workflow.xml    |   2 +
 .../converter/OozieFeedWorkflowBuilderTest.java |   4 +-
 feed/src/test/resources/fs-replication-feed.xml |   1 +
 .../falcon/replication/FeedReplicator.java      |   6 ++
 .../falcon/replication/FeedReplicatorTest.java  |   7 +-
 9 files changed, 79 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d123bc7..27c2056 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
     (Venkatesh Seetharam)
    
   IMPROVEMENTS
+    FALCON-391 Add ability to set mapBandwidth. (Michael Miklavcic via Shwetha GS)
+
     FALCON-380 The dependency option doesn't mention input or output for a feed.
     (Suhas Vasu via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 22b1050..0a4de94 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -26,4 +26,5 @@
 *.falcon.parentworkflow.retry.max=3
 *.falcon.parentworkflow.retry.interval.secs=1
 
-*.falcon.replication.workflow.maxmaps=5
\ No newline at end of file
+*.falcon.replication.workflow.maxmaps=5
+*.falcon.replication.workflow.mapbandwidthKB=102400
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 1910544..447a435 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -227,12 +227,14 @@ upto 8 hours then late-arrival's cut-off="hours(8)"
         <property name="jobPriority" value="VERY_HIGH"/>
         <property name="timeout" value="hours(1)"/>
         <property name="parallel" value="3"/>
+        <property name="maxMaps" value="8"/>
+        <property name="mapBandwidthKB" value="1024"/>
     </properties>
 </verbatim>
 A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties
 available to user to specify the hadoop job queue and priority, the same value is used by Falcons launcher job.
 "timeout" and "parallel" are other special properties which decides replication instance's timeout value while
-waiting for the feed instance and parallel decides the concurrent replication instances that can run at any given time.
+waiting for the feed instance and parallel decides the concurrent replication instances that can run at any given time. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidthKB" represents the bandwidth in KB/s used by each mapper during replication.
  
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines  the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.  

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 2008c2d..3c8d2f2 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -248,6 +248,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
 
     private class ReplicationOozieWorkflowMapper {
         private static final String MR_MAX_MAPS = "maxMaps";
+        private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
 
         private static final int THIRTY_MINUTES = 30 * 60 * 1000;
 
@@ -435,49 +436,47 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         }
 
         private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
-            String wfName, Storage sourceStorage,
-            Storage targetStorage) throws FalconException {
+            String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
             ACTION replicationAction = new ACTION();
             WORKFLOW replicationWF = new WORKFLOW();
-            try {
-                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
-                props.put("srcClusterName", srcCluster.getName());
-                props.put("srcClusterColo", srcCluster.getColo());
-                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
-                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
-                }
 
-                // the storage type is uniform across source and target feeds for replication
-                props.put("falconFeedStorageType", sourceStorage.getType().name());
-
-                String instancePaths = null;
-                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
-                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
-                    instancePaths = pathsWithPartitions;
-
-                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
-                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                    instancePaths = "${coord:dataIn('input')}";
-
-                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
-                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
-                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
-                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
-                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
-                        trgCluster, targetTableStorage, props);
-                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
-                }
+            replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+            Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+            props.put("srcClusterName", srcCluster.getName());
+            props.put("srcClusterColo", srcCluster.getColo());
+            if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+                props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+            }
+            if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+                props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+            }
 
-                propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
-                propagateUserWorkflowProperties(props, "replication");
+            // the storage type is uniform across source and target feeds for replication
+            props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+            String instancePaths = null;
+            if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+                String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
+                instancePaths = pathsWithPartitions;
+
+                propagateFileSystemCopyProperties(pathsWithPartitions, props);
+            } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                instancePaths = "${coord:dataIn('input')}";
+
+                final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+                propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+                final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+                propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+                propagateTableCopyProperties(srcCluster, sourceTableStorage,
+                    trgCluster, targetTableStorage, props);
+                setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+            }
 
-                replicationWF.setConfiguration(getCoordConfig(props));
-                replicationAction.setWorkflow(replicationWF);
+            propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
+            propagateUserWorkflowProperties(props, "replication");
 
-            } catch (Exception e) {
-                throw new FalconException("Unable to create replication workflow", e);
-            }
+            replicationWF.setConfiguration(getCoordConfig(props));
+            replicationAction.setWorkflow(replicationWF);
 
             return replicationAction;
         }
@@ -486,6 +485,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
         }
 
+        private String getDefaultMapBandwidth() {
+            return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400");
+        }
+
         private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
             Feed feed) throws FalconException {
             String srcPart = FeedHelper.normalizePartitionExpression(
@@ -525,20 +528,23 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         }
 
         private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
-            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
-            throws IOException, FalconException {
+            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath) throws FalconException {
             Configuration conf = ClusterHelper.getConfiguration(trgCluster);
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
 
-            // copy import export scripts to stagingDir
-            Path scriptPath = new Path(wfPath, "scripts");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
-            // create hive conf to stagingDir
-            Path confPath = new Path(wfPath + "/conf");
-            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
-            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+            try {
+                // copy import export scripts to stagingDir
+                Path scriptPath = new Path(wfPath, "scripts");
+                copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+                copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+                // create hive conf to stagingDir
+                Path confPath = new Path(wfPath + "/conf");
+                createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+                createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+            } catch(IOException e) {
+                throw new FalconException(e);
+            }
         }
 
         private void copyHiveScript(FileSystem fs, Path scriptPath,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 34ef68b..205beb2 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -119,6 +119,8 @@
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
             <arg>-maxMaps</arg>
             <arg>${maxMaps}</arg>
+            <arg>-mapBandwidthKB</arg>
+            <arg>${mapBandwidthKB}</arg>
             <arg>-sourcePaths</arg>
             <arg>${distcpSourcePaths}</arg>
             <arg>-targetPath</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 182d9cb..b0bb83b 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -214,6 +214,7 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("queueName"), "default");
         Assert.assertEquals(props.get("jobPriority"), "NORMAL");
         Assert.assertEquals(props.get("maxMaps"), "5");
+        Assert.assertEquals(props.get("mapBandwidthKB"), "102400");
 
         assertLibExtensions(coord, "replication");
         assertWorkflowRetries(coord);
@@ -325,7 +326,7 @@ public class OozieFeedWorkflowBuilderTest {
 
         JAVA replication = replicationActionNode.getJava();
         List<String> args = replication.getArg();
-        Assert.assertEquals(args.size(), 11);
+        Assert.assertEquals(args.size(), 13);
 
         HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
@@ -338,6 +339,7 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
         Assert.assertEquals(props.get("maxMaps"), "33");
+        Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/feed/src/test/resources/fs-replication-feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/fs-replication-feed.xml b/feed/src/test/resources/fs-replication-feed.xml
index e0a448f..bada507 100644
--- a/feed/src/test/resources/fs-replication-feed.xml
+++ b/feed/src/test/resources/fs-replication-feed.xml
@@ -63,5 +63,6 @@
     <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
     <properties>
         <property name="maxMaps" value="33" />
+        <property name="mapBandwidthKB" value="2048" />
     </properties>
 </feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 16ef9c2..1a7d1db 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -85,6 +85,11 @@ public class FeedReplicator extends Configured implements Tool {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("mapBandwidthKB", true,
+                "bandwidth per map (in KB) to use for this copy");
+        opt.setRequired(true);
+        options.addOption(opt);
+
         opt = new Option("sourcePaths", true,
                 "comma separtated list of source paths to be copied");
         opt.setRequired(true);
@@ -110,6 +115,7 @@ public class FeedReplicator extends Configured implements Tool {
         distcpOptions.setSyncFolder(true);
         distcpOptions.setBlocking(true);
         distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
+        distcpOptions.setMapBandwidthKB(Integer.valueOf(cmd.getOptionValue("mapBandwidthKB")));
 
         return distcpOptions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7fa6ca57/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 2e3bf57..d64eb61 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -36,13 +36,16 @@ public class FeedReplicatorTest {
     public void testArguments() throws Exception {
         /*
          * <arg>-update</arg>
-         * <arg>-blocking</arg><arg>true</arg> <arg>-maxMaps</arg><arg>20</arg>
+         * <arg>-blocking</arg><arg>true</arg>
+         * <arg>-maxMaps</arg><arg>3</arg>
+         * <arg>-mapBandwidthKB</arg><arg>4</arg>
          * <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
          * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
          */
         final String[] args = {
             "true",
             "-maxMaps", "3",
+            "-mapBandwidthKB", "4096",
             "-sourcePaths", "hdfs://localhost:8020/tmp/",
             "-targetPath", "hdfs://localhost1:8020/tmp/",
             "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
@@ -54,6 +57,8 @@ public class FeedReplicatorTest {
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
+        Assert.assertEquals(options.getMaxMaps(), 3);
+        Assert.assertEquals(options.getMapBandwidth(), 4096);
         Assert.assertEquals(options.getSourcePaths(), srcPaths);
         Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost1:8020/tmp/"));
     }