You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/08 11:46:54 UTC

git commit: FALCON-357 HCatalog Feed replication: Hive export job fails when table partition contains multiple dated columns. Contributed by Satish Mittal

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 64c2cb573 -> 5626b2dd1


FALCON-357 HCatalog Feed replication: Hive export job fails when table partition contains multiple dated columns. Contributed by Satish Mittal


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5626b2dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5626b2dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5626b2dd

Branch: refs/heads/master
Commit: 5626b2dd1d63a9762806e0e3a20d9624a46005c5
Parents: 64c2cb5
Author: Shwetha GS <sh...@inmobi.com>
Authored: Tue Jul 8 15:16:42 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Tue Jul 8 15:16:42 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 build-tools/src/bin/build-oozie.sh              |   6 +-
 build-tools/src/patches/OOZIE-1741.patch        | 397 +++++++++++++++++++
 .../workflow/OozieFeedWorkflowBuilder.java      |   2 +-
 .../converter/OozieFeedWorkflowBuilderTest.java |   2 +-
 .../workflow/OozieProcessWorkflowBuilder.java   |   2 +
 .../OozieProcessWorkflowBuilderTest.java        |   1 +
 7 files changed, 410 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b7d148..56c9ff6 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-357 HCatalog Feed replication: Hive export job fails when table partition 
+   contains multiple dated columns. (Satish Mittal via Shwetha GS)
+
    FALCON-495 multi source single target feed replication failing in regression.
    (Satish Mittal via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/build-tools/src/bin/build-oozie.sh
----------------------------------------------------------------------
diff --git a/build-tools/src/bin/build-oozie.sh b/build-tools/src/bin/build-oozie.sh
index 625613c..d7e99ba 100755
--- a/build-tools/src/bin/build-oozie.sh
+++ b/build-tools/src/bin/build-oozie.sh
@@ -79,6 +79,10 @@ case $VERSION in
     ;;
 4.0.0 )
     patch -p1 --verbose < ../../build-tools/src/patches/OOZIE-1551-4.0.patch
+    patch -p0 < ../../build-tools/src/patches/OOZIE-1741.patch
+    ;;
+4.0.1 )
+    patch -p0 < ../../build-tools/src/patches/OOZIE-1741.patch
     ;;
 esac
 
@@ -86,4 +90,4 @@ rm `find . -name 'pom.xml.bak'`
 
 $MVN_CMD clean install -DskipTests
 
-popd
\ No newline at end of file
+popd

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/build-tools/src/patches/OOZIE-1741.patch
----------------------------------------------------------------------
diff --git a/build-tools/src/patches/OOZIE-1741.patch b/build-tools/src/patches/OOZIE-1741.patch
new file mode 100644
index 0000000..e69b2d9
--- /dev/null
+++ b/build-tools/src/patches/OOZIE-1741.patch
@@ -0,0 +1,397 @@
+diff --git core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
+index e5f0146..9a36af0 100644
+--- core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
++++ core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
+@@ -115,6 +115,12 @@ public class HCatELFunctions {
+         return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
+     }
+
++    public static String ph1_coord_dataInPartitions_echo(String dataInName, String type) {
++        // Checking if the dataIn/dataOut is correct?
++        isValidDataEvent(dataInName);
++        return echoUnResolved("dataInPartitions", "'" + dataInName + "', '" + type + "'");
++    }
++
+     public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
+         // Checking if the dataIn/dataOut is correct?
+         isValidDataEvent(dataOutName);
+@@ -266,6 +272,47 @@ public class HCatELFunctions {
+     }
+
+     /**
++     * Used to specify the entire HCat partition defining input for workflow job. <p/> Look for two evaluator-level
++     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the data-in HCat URI.
++     * <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something unresolved,
++     * this function will echo back the original function <p/> otherwise it sends the partition.
++     *
++     * @param dataInName : DataIn name
++     * @param type : for action type: hive-export
++     */
++    public static String ph3_coord_dataInPartitions(String dataInName, String type) {
++        ELEvaluator eval = ELEvaluator.getCurrent();
++        String uri = (String) eval.getVariable(".datain." + dataInName);
++        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
++        if (unresolved != null && unresolved.booleanValue() == true) {
++            return "${coord:dataInPartitions('" + dataInName + "', '" + type + "')}";
++        }
++        String partitionValue = null;
++        if (uri != null) {
++            if (type.equals("hive-export")) {
++                String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
++                if (uriList.length > 1) {
++                    throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
++                        + dataInName + " URI: " + uri);
++                }
++                try {
++                    partitionValue = new HCatURI(uri).toPartitionValueString(type);
++                }
++                catch (URISyntaxException e) {
++                    throw new RuntimeException("Parsing exception for HCatURI " + uri, e);
++                }
++            } else {
++                  throw new RuntimeException("Unsupported type: " + type + " dataset name: " + dataInName);
++            }
++        }
++        else {
++            XLog.getLog(HCatELFunctions.class).warn("URI is null");
++            return null;
++        }
++        return partitionValue;
++    }
++
++    /**
+      * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
+      * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+      * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+diff --git core/src/main/resources/oozie-default.xml core/src/main/resources/oozie-default.xml
+index 455ef9d..889f10d 100644
+--- core/src/main/resources/oozie-default.xml
++++ core/src/main/resources/oozie-default.xml
+@@ -837,6 +837,7 @@
+             coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionFilter_echo,
+             coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMin_echo,
+             coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitionMax_echo,
++            coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataInPartitions_echo,
+             coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitions_echo,
+             coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph1_coord_dataOutPartitionValue_echo
+         </value>
+@@ -1101,6 +1102,7 @@
+             coord:dataInPartitionFilter=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionFilter,
+             coord:dataInPartitionMin=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMin,
+             coord:dataInPartitionMax=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitionMax,
++            coord:dataInPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataInPartitions,
+             coord:dataOutPartitions=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitions,
+             coord:dataOutPartitionValue=org.apache.oozie.coord.HCatELFunctions#ph3_coord_dataOutPartitionValue
+         </value>
+diff --git core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
+index f46f1ec..fac2177 100644
+--- core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
++++ core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
+@@ -264,6 +264,38 @@ public class TestHCatELFunctions extends XHCatTestCase {
+     }
+
+     /**
++     * Test HCat dataInPartition EL function (phase 1) which echo back the EL
++     * function itself
++     *
++     * @throws Exception
++     */
++    @Test
++    public void testDataInPartitionsPh1() throws Exception {
++        init("coord-job-submit-data");
++        String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
++        // +ve test
++        eval.setVariable("oozie.dataname.ABC", "data-in");
++        assertEquals("${coord:dataInPartitions('ABC', 'hive-export')}", CoordELFunctions.evalAndWrap(eval, expr));
++        // -ve test
++        expr = "${coord:dataInPartitions('ABCD', 'hive-export')}";
++        try {
++            CoordELFunctions.evalAndWrap(eval, expr);
++            fail("should throw exception because Data-in is not defined");
++        }
++        catch (Exception ex) {
++        }
++        // -ve test
++        expr = "${coord:dataInPartitions('ABCD')}";
++        eval.setVariable("oozie.dataname.ABCD", "data-in");
++        try {
++            CoordELFunctions.evalAndWrap(eval, expr);
++            fail("should throw exception because EL function requires 2 parameters");
++        }
++        catch (Exception ex) {
++        }
++    }
++
++    /**
+      * Test HCat dataOutPartition EL function (phase 1) which echo back the EL
+      * function itself
+      *
+@@ -463,6 +495,31 @@ public class TestHCatELFunctions extends XHCatTestCase {
+         assertTrue(res.equals("20"));
+     }
+
++    /**
++     * Test dataInPartitions EL function (phase 3) which returns the complete partition value string of a single partition
++     * in case of hive-export type.
++     *
++     * @throws Exception
++     */
++    @Test
++    public void testDataInPartitions() throws Exception {
++        init("coord-action-start");
++        String expr = "${coord:dataInPartitions('ABC', 'hive-export')}";
++        eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
++        eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
++        String res = CoordELFunctions.evalAndWrap(eval, expr);
++        assertTrue(res.equals("datastamp='20120230',region='us'") || res.equals("region='us',datastamp='20120230'"));
++        // -ve test; execute EL function with any other type than hive-export
++        try {
++            expr = "${coord:dataInPartitions('ABC', 'invalid-type')}";
++            eval.setVariable(".datain.ABC", "hcat://hcat.server.com:5080/mydb/clicks/datastamp=20120230;region=us");
++            eval.setVariable(".datain.ABC.unresolved", Boolean.FALSE);
++            res = CoordELFunctions.evalAndWrap(eval, expr);
++            fail("EL function should throw exception because of invalid type");
++        } catch (Exception e) {
++        }
++    }
++
+     private void init(String tag) throws Exception {
+         init(tag, "hdfs://localhost:9000/user/" + getTestUser() + "/US/${YEAR}/${MONTH}/${DAY}");
+     }
+diff --git docs/src/site/twiki/CoordinatorFunctionalSpec.twiki docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+index a5ecbc5..621bd3d 100644
+--- docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
++++ docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+@@ -2608,6 +2608,192 @@ C = foreach B generate foo, bar;
+ store C into 'myOutputDatabase.myOutputTable' using org.apache.hcatalog.pig.HCatStorer('region=APAC,datestamp=20090102');
+ </blockquote>
+
++---++++ 6.8.8 coord:dataInPartitions(String name, String type) EL function
++
++The =${coord:dataInPartitions(String name, String type)}= EL function resolves to a list of partition key-value
++pairs for the input-event dataset. Currently the only type supported is 'hive-export'. The 'hive-export' type
++supports only one partition instance and it can be used to create the complete partition value string that can
++be used in a hive query for partition export/import.
++
++The example below illustrates a hive export-import job triggered by a coordinator, using the EL functions for HCat database,
++table, input partitions. The example replicates the hourly processed data across hive tables.
++
++*%GREEN% Example: %ENDCOLOR%*
++
++#HCatHiveExampleOne
++
++*Coordinator application definition:*
++
++<blockquote>
++    <coordinator-app xmlns="uri:oozie:coordinator:0.3" name="app-coord"
++    frequency="${coord:hours(1)}" start="2014-03-28T08:00Z"
++    end="2030-01-01T00:00Z" timezone="UTC">
++
++    <datasets>
++        <dataset name="Stats-1" frequency="${coord:hours(1)}"
++            initial-instance="2014-03-28T08:00Z" timezone="UTC">
++            <uri-template>hcat://foo:11002/myInputDatabase1/myInputTable1/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
++            </uri-template>
++        </dataset>
++        <dataset name="Stats-2" frequency="${coord:hours(1)}"
++            initial-instance="2014-03-28T08:00Z" timezone="UTC">
++            <uri-template>hcat://foo:11002/myInputDatabase2/myInputTable2/year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}
++            </uri-template>
++        </dataset>
++    </datasets>
++    <input-events>
++        <data-in name="processed-logs-1" dataset="Stats-1">
++            <instance>${coord:current(0)}</instance>
++        </data-in>
++    </input-events>
++    <output-events>
++        <data-out name="processed-logs-2" dataset="Stats-2">
++            <instance>${coord:current(0)}</instance>
++        </data-out>
++    </output-events>
++    <action>
++      <workflow>
++        <app-path>hdfs://bar:8020/usr/joe/logsreplicator-wf</app-path>
++        <configuration>
++          <property>
++            <name>EXPORT_DB</name>
++            <value>${coord:databaseIn('processed-logs-1')}</value>
++          </property>
++          <property>
++            <name>EXPORT_TABLE</name>
++            <value>${coord:tableIn('processed-logs-1')}</value>
++          </property>
++          <property>
++            <name>IMPORT_DB</name>
++            <value>${coord:databaseOut('processed-logs-2')}</value>
++          </property>
++          <property>
++            <name>IMPORT_TABLE</name>
++            <value>${coord:tableOut('processed-logs-2')}</value>
++          </property>
++          <property>
++            <name>EXPORT_PARTITION</name>
++            <value>${coord:dataInPartitions('processed-logs-1', 'hive-export')}</value>
++          </property>
++          <property>
++            <name>EXPORT_PATH</name>
++            <value>hdfs://bar:8020/staging/${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH')}/data</value>
++          </property>
++        </configuration>
++      </workflow>
++    </action>
++</coordinator-app>
++</blockquote>
++
++Parameterizing the input/output databases and tables using the corresponding EL function as shown will make them
++available in the hive action of the workflow 'logsreplicator-wf'.
++
++Each coordinator action will use as input events the hourly instances of the 'processed-logs-1' dataset. The
++=${coord:dataInPartitions(String name, String type)}= function enables the coordinator application to pass the
++partition corresponding to hourly dataset instances to the workflow job triggered by the coordinator action.
++The workflow passes this partition value to the hive export script that exports the hourly partition from source
++database to the staging location referred as =EXPORT_PATH=. The hive import script imports the hourly partition from
++=EXPORT_PATH= staging location into the target database.
++
++#HCatWorkflow
++
++*Workflow definition:*
++
++<blockquote>
++<workflow-app xmlns="uri:oozie:workflow:0.3" name="logsreplicator-wf">
++    <start to="table-export"/>
++    <action name="table-export">
++        <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2">
++            <job-tracker>${jobTracker}</job-tracker>
++            <name-node>${nameNode}</name-node>
++            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
++            <configuration>
++                <property>
++                    <name>mapred.job.queue.name</name>
++                    <value>${queueName}</value>
++                </property>
++                <property>
++                    <name>oozie.launcher.mapred.job.priority</name>
++                    <value>${jobPriority}</value>
++                </property>
++            </configuration>
++            <script>${wf:appPath()}/scripts/table-export.hql</script>
++            <param>sourceDatabase=${EXPORT_DB}</param>
++            <param>sourceTable=${EXPORT_TABLE}</param>
++            <param>sourcePartition=${EXPORT_PARTITION}</param>
++            <param>sourceStagingDir=${EXPORT_PATH}</param>
++        </hive:hive>
++        <ok to="table-import"/>
++        <error to="fail"/>
++    </action>
++    <action name="table-import">
++        <hive:hive xmlns:hive="uri:oozie:hive-action:0.2" xmlns="uri:oozie:hive-action:0.2">
++            <job-tracker>${jobTracker}</job-tracker>
++            <name-node>${nameNode}</name-node>
++            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
++            <configuration>
++                <property>
++                    <name>mapred.job.queue.name</name>
++                    <value>${queueName}</value>
++                </property>
++                <property>
++                    <name>oozie.launcher.mapred.job.priority</name>
++                    <value>${jobPriority}</value>
++                </property>
++            </configuration>
++            <script>${wf:appPath()}/scripts/table-import.hql</script>
++            <param>targetDatabase=${IMPORT_DB}</param>
++            <param>targetTable=${IMPORT_TABLE}</param>
++            <param>targetPartition=${EXPORT_PARTITION}</param>
++            <param>sourceStagingDir=${EXPORT_PATH}</param>
++        </hive:hive>
++        <ok to="end"/>
++        <error to="fail"/>
++    </action>
++    <kill name="fail">
++        <message>
++            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
++        </message>
++    </kill>
++    <end name="end"/>
++</workflow-app>
++</blockquote>
++
++Ensure that the following jars are in classpath, with versions corresponding to hcatalog installation:
++hcatalog-core.jar, webhcat-java-client.jar, hive-common.jar, hive-exec.jar, hive-metastore.jar, hive-serde.jar,
++ libfb303.jar. The hive-site.xml needs to be present in classpath as well.
++
++*Example Hive Export script:*
++The following script exports a particular Hive table partition into staging location, where the partition value
++ is computed through =${coord:dataInPartitions(String name, String type)}= EL function.
++<blockquote>
++export table ${sourceDatabase}.${sourceTable} partition (${sourcePartition}) to '${sourceStagingDir}';
++</blockquote>
++
++For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions(
++'processed-logs-1', 'hive-export'), the above Hive script with resolved values would look like:
++<blockquote>
++export table myInputDatabase1/myInputTable1 partition (year='2014',month='03',day='28',hour='08') to 'hdfs://bar:8020/staging/2014-03-28-08';
++</blockquote>
++
++*Example Hive Import script:*
++The following script imports a particular Hive table partition from staging location, where the partition value is computed
++ through =${coord:dataInPartitions(String name, String type)}= EL function.
++<blockquote>
++use ${targetDatabase};
++alter table ${targetTable} drop if exists partition ${targetPartition};
++import table ${targetTable} partition (${targetPartition}) from '${sourceStagingDir}';
++</blockquote>
++
++For example, for the 2014-03-28T08:00Z run with the given dataset instances and ${coord:dataInPartitions(
++'processed-logs-2', 'hive-export'), the above Hive script with resolved values would look like:
++
++<blockquote>
++use myInputDatabase2;
++alter table myInputTable2 drop if exists partition (year='2014',month='03',day='28',hour='08');
++import table myInputTable2 partition (year='2014',month='03',day='28',hour='08') from 'hdfs://bar:8020/staging/2014-03-28-08';
++</blockquote>
++
+
+ ---+++ 6.9. Parameterization of Coordinator Application
+
+diff --git sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
+index d797f9b..4bc5048 100644
+--- sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
++++ sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
+@@ -260,6 +260,35 @@ public class HCatURI {
+         return filter.toString();
+     }
+
++    /**
++     * Get the entire partition value string from partition map.
++     * In case of type hive-export, it can be used to create entire partition value string
++     * that can be used in Hive query for partition export/import.
++     *
++     * type hive-export
++     * @return partition value string
++     */
++    public String toPartitionValueString(String type) {
++        StringBuilder value = new StringBuilder();
++        if (type.equals("hive-export")) {
++            String comparator = "=";
++            String separator = ",";
++            for (Map.Entry<String, String> entry : partitions.entrySet()) {
++                if (value.length() > 1) {
++                    value.append(separator);
++                }
++                value.append(entry.getKey());
++                value.append(comparator);
++                value.append(PARTITION_VALUE_QUOTE);
++                value.append(entry.getValue());
++                value.append(PARTITION_VALUE_QUOTE);
++            }
++        } else {
++            throw new RuntimeException("Unsupported type: " + type);
++        }
++        return value.toString();
++    }
++
+     @Override
+     public String toString() {
+         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 6d36840..4e300bf 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -622,7 +622,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
 
             props.put(prefix + "Database", tableStorage.getDatabase());
             props.put(prefix + "Table", tableStorage.getTable());
-            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
+            props.put(prefix + "Partition", "(${coord:dataInPartitions('input', 'hive-export')})");
         }
 
         private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 5d6879a..d793e65 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -537,7 +537,7 @@ public class OozieFeedWorkflowBuilderTest {
 
         Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
         Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
-        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
+        Assert.assertEquals(props.get(prefix + "Partition"), "(${coord:dataInPartitions('input', 'hive-export')})");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 70aeebd..3751f95 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -545,6 +545,8 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
             "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
         props.put(prefix + "_partition_filter_java",
             "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+        props.put(prefix + "_datain_partitions_hive",
+            "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
     }
 
     private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5626b2dd/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 2522ca3..1eeadaf 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -617,6 +617,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
             props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
             props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+            props.put(prefix + "_datain_partitions_hive", "${coord:dataInPartitions('input', 'hive-export')}");
         } else if (prefix.equals("falcon_output")) {
             props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
         }