You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by so...@apache.org on 2015/10/29 02:08:38 UTC

[1/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran

Repository: falcon
Updated Branches:
  refs/heads/master 35006fe32 -> 89040a296


http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
new file mode 100644
index 0000000..34424c9
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
@@ -0,0 +1,47 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * -->
+
+<action name="db-import-sqoop" xmlns='uri:oozie:workflow:0.3'>
+    <sqoop xmlns="uri:oozie:sqoop-action:0.3">
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <property>
+                <name>mapred.compress.map.output</name>
+                <value>true</value>
+            </property>
+            <!-- Assuming the connectors are in oozie share lib -->
+            <property>
+            <!-- Will enable using sharelib -->
+                <name>oozie.use.system.libpath</name>
+                <value>true</value>
+            </property>
+        </configuration>
+        <command>${sqoopCommand}</command>
+    </sqoop>
+    <ok to="end"/>
+    <error to="fail"/>
+</action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index df0d286..fa5d804 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -88,6 +88,8 @@
         <arg>${falconInputFeeds}</arg>
         <arg>-falconInPaths</arg>
         <arg>${falconInPaths}</arg>
+        <arg>-datasource</arg>
+        <arg>${datasource == 'NA' ? 'IGNORE' : datasource}</arg>
     </java>
     <ok to="end"/>
     <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 87c55e3..6f2c480 100644
--- a/pom.xml
+++ b/pom.xml
@@ -307,6 +307,9 @@
                                 <exclude>**/maven-eclipse.xml</exclude>
                                 <exclude>**/.externalToolBuilders/**</exclude>
                                 <exclude>html5-ui/**</exclude>
+                                <exclude>**/db1.log</exclude>
+                                <exclude>**/db1.properties</exclude>
+                                <exclude>**/db1.script</exclude>
                             </excludes>
                         </configuration>
                         <executions>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 9e4dc8f..0999c36 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -568,6 +568,9 @@
 	          <exclude>**/data.txt</exclude>
 	          <exclude>**/maven-eclipse.xml</exclude>
 	          <exclude>**/.externalToolBuilders/**</exclude>
+                  <exclude>**/db1.log</exclude>
+                  <exclude>**/db1.properties</exclude>
+                  <exclude>**/db1.script</exclude>
                 </excludes>
               </configuration>
       	    </plugin>	

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
new file mode 100644
index 0000000..8cc1273
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.HsqldbTestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.Map;
+
+/**
+ * Integration test for Feed Import.
+ */
+
+@Test
+public class FeedImportIT {
+    public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName());
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        HsqldbTestUtils.start();
+        HsqldbTestUtils.changeSAPassword("sqoop");
+        HsqldbTestUtils.createAndPopulateCustomerTable();
+
+        TestContext.cleanupStore();
+        TestContext.prepare();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        HsqldbTestUtils.tearDown();
+    }
+
+    @Test
+    public void testFeedImportHSql() throws Exception {
+        Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows());
+    }
+
+    @Test
+    public void testSqoopImport() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE, overlay);
+        LOG.info("entity -submit -type datasource -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
+        LOG.info("entity -submitAndSchedule -type feed -file " + filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file "
+                + filePath));
+    }
+
+    @Test
+    public void testSqoopImportDeleteDatasource() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE, overlay);
+        LOG.info("entity -submit -type datasource -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
+        LOG.info("entity -submit -type feed -file " + filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file "
+                + filePath));
+
+        LOG.info("entity -delete -type datasource -name datasource-test");
+        Assert.assertEquals(-1, TestContext.executeWithURL("entity -delete -type datasource -name datasource-test"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index d067dee..0697b3d 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -90,6 +90,9 @@ import java.util.regex.Pattern;
 public class TestContext {
     public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
     public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
+    public static final String FEED_TEMPLATE3 = "/feed-template3.xml";
+
+    public static final String DATASOURCE_TEMPLATE = "/datasource-template.xml";
 
     public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
new file mode 100644
index 0000000..a92629f
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hsqldb.Server;
+
+/**
+ * Create a simple hsqldb server and schema to use for testing.
+ */
+public final class HsqldbTestUtils {
+
+    public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName());
+
+    // singleton server instance.
+    private static Server server;
+
+    private static final String IN_MEM = "mem:/";
+
+    private static boolean inMemoryDB = IN_MEM.equals(getServerHost());
+
+    private HsqldbTestUtils() {}
+
+    public static String getServerHost() {
+        String host = System.getProperty("hsql.server.host", IN_MEM);
+        host = "localhost";
+        if (!host.endsWith("/")) { host += "/"; }
+        return host;
+    }
+
+    // Database name can be altered too
+    private static final String DATABASE_NAME = System.getProperty("hsql.database.name",  "db1");
+    private static final String CUSTOMER_TABLE_NAME = "CUSTOMER";
+    private static final String DB_URL = "jdbc:hsqldb:" + getServerHost() + DATABASE_NAME;
+    private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+    public static String getUrl() {
+        return DB_URL;
+    }
+
+    public static String getDatabaseName() {
+        return DATABASE_NAME;
+    }
+
+    /**
+     * start the server.
+     */
+    public static void start() {
+        if (null == server) {
+            LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME);
+            String tmpDir = System.getProperty("test.build.data", "/tmp/");
+            String dbLocation = tmpDir + "/falcon/testdb.file";
+            if (inMemoryDB) {dbLocation = IN_MEM; }
+            server = new Server();
+            server.setDatabaseName(0, DATABASE_NAME);
+            server.putPropertiesFromString("database.0=" + dbLocation
+                    + ";no_system_exit=true;");
+            server.start();
+            LOG.info("Started server with url=" + DB_URL);
+        }
+    }
+
+    public static void stop() {
+        if (null != server) {
+            server.stop();
+        }
+    }
+
+    public static void tearDown() throws SQLException {
+        dropExistingSchema();
+        stop();
+    }
+
+    public static void changeSAPassword(String passwd) throws Exception {
+        Connection connection = null;
+        Statement st = null;
+
+        LOG.info("Changing password for SA");
+        try {
+            connection = getConnectionSystem();
+
+            st = connection.createStatement();
+            st.executeUpdate("SET PASSWORD \"" + passwd + "\"");
+            connection.commit();
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+    private static Connection getConnectionSystem() throws SQLException {
+        return getConnection("SA", "");
+    }
+
+    private static Connection getConnection() throws SQLException {
+        return getConnection("SA", "sqoop");
+    }
+    private static Connection getConnection(String user, String password) throws SQLException {
+        try {
+            Class.forName(DRIVER_CLASS);
+        } catch (ClassNotFoundException cnfe) {
+            LOG.error("Could not get connection; driver class not found: "
+                    + DRIVER_CLASS);
+            return null;
+        }
+        Connection connection = DriverManager.getConnection(DB_URL, user, password);
+        connection.setAutoCommit(false);
+        return connection;
+    }
+
+    /**
+     * Returns database URL for the server instance.
+     * @return String representation of DB_URL
+     */
+    public static String getDbUrl() {
+        return DB_URL;
+    }
+
+    public static int getNumberOfRows() throws SQLException {
+        Connection connection = null;
+        Statement st = null;
+        try {
+            connection = getConnection();
+
+            st = connection.createStatement();
+            ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM " + CUSTOMER_TABLE_NAME);
+            int rowCount = 0;
+            if (rs.next()) {
+                rowCount = rs.getInt(1);
+            }
+            return rowCount;
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+
+    public static void createAndPopulateCustomerTable() throws SQLException, ClassNotFoundException {
+
+        LOG.info("createAndPopulateCustomerTable");
+        Connection connection = null;
+        Statement st = null;
+        try {
+            connection = getConnection();
+
+            st = connection.createStatement();
+            st.executeUpdate("DROP TABLE " + CUSTOMER_TABLE_NAME + " IF EXISTS");
+            st.executeUpdate("CREATE TABLE " + CUSTOMER_TABLE_NAME + "(id INT NOT NULL PRIMARY KEY, name VARCHAR(64))");
+
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(1, 'Apple')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(2, 'Blackberry')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(3, 'Caterpillar')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(4, 'DuPont')");
+
+            connection.commit();
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Delete any existing tables.
+     */
+    public static void dropExistingSchema() throws SQLException {
+        String [] tables = listTables();
+        if (null != tables) {
+            Connection conn = getConnection();
+            for (String table : tables) {
+                Statement s = conn.createStatement();
+                try {
+                    s.executeUpdate("DROP TABLE " + table);
+                    conn.commit();
+                } finally {
+                    s.close();
+                }
+            }
+        }
+    }
+
+    public static String[] listTables() {
+        ResultSet results = null;
+        String [] tableTypes = {"TABLE"};
+        try {
+            try {
+                DatabaseMetaData metaData = getConnection().getMetaData();
+                results = metaData.getTables(null, null, null, tableTypes);
+            } catch (SQLException sqlException) {
+                LOG.error("Error reading database metadata: "
+                        + sqlException.toString(), sqlException);
+                return null;
+            }
+
+            if (null == results) {
+                return null;
+            }
+
+            try {
+                ArrayList<String> tables = new ArrayList<String>();
+                while (results.next()) {
+                    String tableName = results.getString("TABLE_NAME");
+                    tables.add(tableName);
+                }
+
+                return tables.toArray(new String[0]);
+            } catch (SQLException sqlException) {
+                LOG.error("Error reading from database: "
+                        + sqlException.toString(), sqlException);
+                return null;
+            }
+        } finally {
+            if (null != results) {
+                try {
+                    results.close();
+                    getConnection().commit();
+                } catch (SQLException sqlE) {
+                    LOG.error("Exception closing ResultSet: "
+                            + sqlE.toString(), sqlE);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/datasource-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/datasource-template.xml b/webapp/src/test/resources/datasource-template.xml
new file mode 100644
index 0000000..fb7a329
--- /dev/null
+++ b/webapp/src/test/resources/datasource-template.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<datasource colo="##colo##" description="" type="hsql" name="datasource-test" xmlns="uri:falcon:datasource:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/feed-template3.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-template3.xml b/webapp/src/test/resources/feed-template3.xml
new file mode 100644
index 0000000..a6c1d6b
--- /dev/null
+++ b/webapp/src/test/resources/feed-template3.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+    <groups>input</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="##cluster##" type="source">
+            <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <import>
+                <source name="datasource-test" tableName="simple">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="##user##" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>


[3/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran

Posted by so...@apache.org.
FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/89040a29
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/89040a29
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/89040a29

Branch: refs/heads/master
Commit: 89040a296de3d4a9bd0aa2232342438add37afee
Parents: 35006fe
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Wed Oct 28 18:08:31 2015 -0700
Committer: Sowmya Ramesh <sr...@hortonworks.com>
Committed: Wed Oct 28 18:08:31 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/falcon/LifeCycle.java  |   3 +-
 client/src/main/java/org/apache/falcon/Tag.java |   2 +-
 .../org/apache/falcon/entity/v0/EntityType.java |   8 +-
 .../falcon/metadata/RelationshipType.java       |   2 +
 client/src/main/resources/datasource-0.1.xsd    | 256 ++++++++++++++++++
 client/src/main/resources/feed-0.1.xsd          |  93 ++++++-
 client/src/main/resources/jaxb-binding.xjb      |   9 +
 client/src/main/resources/mysql_database.xml    |  46 ++++
 .../apache/falcon/entity/DatasourceHelper.java  | 199 ++++++++++++++
 .../org/apache/falcon/entity/EntityUtil.java    |  21 +-
 .../org/apache/falcon/entity/FeedHelper.java    | 191 +++++++++++++-
 .../entity/parser/DatasourceEntityParser.java   | 127 +++++++++
 .../entity/parser/EntityParserFactory.java      |   2 +
 .../falcon/entity/parser/FeedEntityParser.java  |  64 ++++-
 .../falcon/entity/store/ConfigurationStore.java |   2 +-
 .../apache/falcon/entity/v0/EntityGraph.java    |  11 +
 .../entity/v0/EntityIntegrityChecker.java       |   3 +
 .../EntityRelationshipGraphBuilder.java         |  33 +++
 .../InstanceRelationshipGraphBuilder.java       |  33 +++
 .../falcon/metadata/MetadataMappingService.java |   9 +-
 .../falcon/metadata/RelationshipLabel.java      |   1 +
 .../org/apache/falcon/util/HdfsClassLoader.java | 159 +++++++++++
 .../falcon/workflow/WorkflowExecutionArgs.java  |   1 +
 .../workflow/WorkflowExecutionContext.java      |   6 +-
 .../apache/falcon/entity/AbstractTestBase.java  |   1 +
 .../apache/falcon/entity/EntityTypeTest.java    |   3 +
 .../apache/falcon/entity/FeedHelperTest.java    | 109 +++++++-
 .../parser/DatasourceEntityParserTest.java      |  77 ++++++
 .../entity/parser/FeedEntityParserTest.java     | 159 ++++++++++-
 .../falcon/entity/v0/EntityGraphTest.java       | 124 ++++++++-
 .../config/datasource/datasource-0.1.xml        |  48 ++++
 .../config/datasource/datasource-file-0.1.xml   |  48 ++++
 .../datasource/datasource-invalid-0.1.xml       |  46 ++++
 .../resources/config/feed/feed-import-0.1.xml   |  74 ++++++
 .../feed/feed-import-exclude-fields-0.1.xml     |  74 ++++++
 .../config/feed/feed-import-invalid-0.1.xml     |  73 +++++
 .../config/feed/feed-import-noargs-0.1.xml      |  64 +++++
 docs/src/site/twiki/EntitySpecification.twiki   |  84 ++++++
 docs/src/site/twiki/FalconCLI.twiki             |  23 +-
 .../falcon/messaging/JMSMessageProducer.java    |   3 +-
 .../oozie/DatabaseImportWorkflowBuilder.java    | 174 ++++++++++++
 .../oozie/FeedImportCoordinatorBuilder.java     | 191 ++++++++++++++
 .../falcon/oozie/ImportWorkflowBuilder.java     |  84 ++++++
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   3 +
 .../OozieOrchestrationWorkflowBuilder.java      |  12 +
 .../feed/FSReplicationWorkflowBuilder.java      |   3 +-
 .../falcon/oozie/feed/FeedBundleBuilder.java    |   5 +
 .../feed/FeedRetentionWorkflowBuilder.java      |   1 +
 .../feed/HCatReplicationWorkflowBuilder.java    |   3 +-
 .../ProcessExecutionWorkflowBuilder.java        |   2 +
 .../feed/import-sqoop-database-action.xml       |  47 ++++
 .../src/main/resources/action/post-process.xml  |   2 +
 pom.xml                                         |   3 +
 webapp/pom.xml                                  |   3 +
 .../apache/falcon/lifecycle/FeedImportIT.java   |  99 +++++++
 .../org/apache/falcon/resource/TestContext.java |   3 +
 .../org/apache/falcon/util/HsqldbTestUtils.java | 263 +++++++++++++++++++
 .../src/test/resources/datasource-template.xml  |  46 ++++
 webapp/src/test/resources/feed-template3.xml    |  59 +++++
 60 files changed, 3253 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1f30cf..b5980be 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1459 Ability to import from database(Venkat Ramachandran via Sowmya Ramesh)
+
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/LifeCycle.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/LifeCycle.java b/client/src/main/java/org/apache/falcon/LifeCycle.java
index 58a2a6c..d4d39e8 100644
--- a/client/src/main/java/org/apache/falcon/LifeCycle.java
+++ b/client/src/main/java/org/apache/falcon/LifeCycle.java
@@ -25,7 +25,8 @@ package org.apache.falcon;
 public enum LifeCycle {
     EXECUTION(Tag.DEFAULT),
     EVICTION(Tag.RETENTION),
-    REPLICATION(Tag.REPLICATION);
+    REPLICATION(Tag.REPLICATION),
+    IMPORT(Tag.IMPORT);
 
     private final Tag tag;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/Tag.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/Tag.java b/client/src/main/java/org/apache/falcon/Tag.java
index beeb812..5027ac0 100644
--- a/client/src/main/java/org/apache/falcon/Tag.java
+++ b/client/src/main/java/org/apache/falcon/Tag.java
@@ -24,7 +24,7 @@ import org.apache.falcon.entity.v0.EntityType;
  * Tag to include in the entity type.
  */
 public enum Tag {
-    DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED);
+    DEFAULT(EntityType.PROCESS), RETENTION(EntityType.FEED), REPLICATION(EntityType.FEED), IMPORT(EntityType.FEED);
 
     private final EntityType entityType;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
index 0657124..3d55547 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
@@ -21,6 +21,7 @@ package org.apache.falcon.entity.v0;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
@@ -38,7 +39,8 @@ import java.util.Arrays;
 public enum EntityType {
     FEED(Feed.class, "/feed-0.1.xsd", "name"),
     PROCESS(Process.class, "/process-0.1.xsd", "name"),
-    CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name");
+    CLUSTER(Cluster.class, "/cluster-0.1.xsd", "name"),
+    DATASOURCE(Datasource.class, "/datasource-0.1.xsd", "name");
 
     //Fail unmarshalling of whole xml if unmarshalling of any element fails
     private static class EventHandler implements ValidationEventHandler {
@@ -93,8 +95,10 @@ public enum EntityType {
         return unmarshaller;
     }
 
+
     public boolean isSchedulable() {
-        return this != EntityType.CLUSTER;
+        // Cluster and Datasource are not schedulable like Feed and Process
+        return ((this != EntityType.CLUSTER) && (this != EntityType.DATASOURCE));
     }
 
     @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"})

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
index f034772..8e5f8ea 100644
--- a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -27,10 +27,12 @@ public enum RelationshipType {
     CLUSTER_ENTITY("cluster-entity"),
     FEED_ENTITY("feed-entity"),
     PROCESS_ENTITY("process-entity"),
+    DATASOURCE_ENTITY("datasource-entity"),
 
     // instance vertex types
     FEED_INSTANCE("feed-instance"),
     PROCESS_INSTANCE("process-instance"),
+    IMPORT_INSTANCE("import-instance"),
 
     // Misc vertex types
     USER("user"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/datasource-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd
new file mode 100644
index 0000000..beb82cc
--- /dev/null
+++ b/client/src/main/resources/datasource-0.1.xsd
@@ -0,0 +1,256 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified" elementFormDefault="qualified"
+           targetNamespace="uri:falcon:datasource:0.1" xmlns="uri:falcon:datasource:0.1"
+           xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" jaxb:version="2.1">
+    <xs:annotation>
+        <xs:documentation>
+            Licensed to the Apache Software Foundation (ASF) under one or more
+            contributor license agreements. See the NOTICE file distributed with
+            this work for additional information regarding copyright ownership.
+            The ASF licenses this file to You under the Apache License, Version
+            2.0
+            (the "License"); you may not use this file except in compliance with
+            the License. You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+            Unless required by applicable law or agreed to in writing, software
+            distributed under the License is distributed on an "AS IS" BASIS,
+            WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+            implied.
+            See the License for the specific language governing permissions and
+            limitations under the License.
+        </xs:documentation>
+        <xs:appinfo>
+            <jaxb:schemaBindings>
+                <jaxb:package name="org.apache.falcon.entity.v0.datasource"/>
+            </jaxb:schemaBindings>
+        </xs:appinfo>
+    </xs:annotation>
+    <xs:element name="datasource" type="datasource">
+    </xs:element>
+    <xs:complexType name="datasource">
+        <xs:annotation>
+            <xs:documentation>The datasource contains various information required
+                to connect to a data source like a MySQL datasource or Kafka cluster.
+                A datasource is referenced by feeds that represent an object like
+                Table (or Topic) in the MySQL database (or Kafka Cluster).
+                name: the name of datasource, which must be unique.
+                colo: the name of the colo to which this datasource belongs to.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="KEY_VALUE_PAIR" name="tags" minOccurs="0" maxOccurs="1">
+                <xs:annotation>
+                    <xs:documentation>
+                        tags: a process specifies an optional list of comma separated tags,
+                        Key Value Pairs, separated by comma,
+                        which is used for classification of datasource entity.
+                        Example: consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+            <xs:element type="interfaces" name="interfaces"/>
+            <xs:element type="driver" name="driver" minOccurs="1" maxOccurs="1" />
+            <xs:element type="properties" name="properties" minOccurs="0"/>
+            <xs:element type="ACL" name="ACL" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute type="IDENTIFIER" name="name" use="required"/>
+        <xs:attribute type="xs:string"  name="colo" use="required"/>
+        <xs:attribute type="xs:string"  name="description"/>
+        <xs:attribute type="datasource-type"  name="type" use="required">
+            <xs:annotation>
+                <xs:documentation>
+                    datasource type could be Relational Databases (MySQL, Oracle etc.), Messgaing systems like
+                    Kafka, etc.
+                </xs:documentation>
+            </xs:annotation>
+        </xs:attribute>
+    </xs:complexType>
+    <xs:complexType name="property">
+        <xs:annotation>
+            <xs:documentation>
+                A key-value pair to pass in any datasource specific properties.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="name" use="required"/>
+        <xs:attribute type="xs:string" name="value" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="interface">
+        <xs:annotation>
+            <xs:documentation>
+                An interface specifies the interface type (read or write), and an
+                endpoint url. Falcon uses these endpoints to import or export
+                data from datasources.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="driver" name="driver" minOccurs="0" maxOccurs="1" />
+            <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="properties" name="properties" minOccurs="0"/>
+        </xs:sequence>
+        <xs:attribute type="interfacetype" name="type" use="required"/>
+        <xs:attribute type="xs:string" name="endpoint" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="properties">
+        <xs:annotation>
+            <xs:documentation>
+                A list of property elements.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="property" name="property" maxOccurs="unbounded" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="interfaces">
+        <xs:annotation>
+            <xs:documentation>
+                A list of interfaces.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="interface" name="interface" maxOccurs="2" minOccurs="1"/>
+            <xs:element type="credential" name="credential" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:simpleType name="interfacetype">
+        <xs:annotation>
+            <xs:documentation>
+                An interface for datasource has 2 different interface types: readonly, write.
+                The readonly endpoint specifies the url/mechanism to use for data IMPORT operation
+                from a datasource while write endpoint specifies the url/mechanism to use for data
+                EXPORT operatrion.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="readonly"/>
+            <xs:enumeration value="write"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="IDENTIFIER">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="(([a-zA-Z]([\-a-zA-Z0-9])*){1,39})"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="KEY_VALUE_PAIR">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([\w_]+=[^,]+)?([,]?[ ]*[\w_]+=[^,]+)*"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:complexType name="credential">
+        <xs:sequence  minOccurs="1" maxOccurs="1" >
+            <xs:element name="userName" minOccurs="1" maxOccurs="1" type="xs:string">
+                <xs:annotation>
+                    <xs:documentation>
+                        The User for the datasource.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+
+            <xs:choice minOccurs="1" maxOccurs="1">
+                <xs:element name="passwordFile" type="xs:string">
+                    <xs:annotation>
+                        <xs:documentation>
+                            The FQ path to a file on HDFS containing the datasource
+                            server password with 400 permissions. Only the user
+                            submitting the job has read access to this file which
+                            will be securely passed to the mappers.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+
+                <xs:element name="passwordText" type="xs:string">
+                <xs:annotation>
+                        <xs:documentation>
+                            Plain text password.
+                        </xs:documentation>
+                    </xs:annotation>
+                </xs:element>
+            </xs:choice>
+        </xs:sequence>
+        <xs:attribute name="type" type="credentialtype" use="required"/>
+    </xs:complexType>
+
+    <xs:simpleType name="credentialtype">
+        <xs:annotation>
+            <xs:documentation>
+                user-password credentials are supported today which can be extended.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="password-file" />
+            <xs:enumeration value="password-text" />
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="datasource-type">
+        <xs:annotation>
+            <xs:documentation>
+                The datasource type can be MySQL, ORACLE, Teradata etc.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="mysql"/>
+            <xs:enumeration value="oracle"/>
+            <xs:enumeration value="hsql"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:complexType name="driver">
+        <xs:annotation>
+            <xs:documentation>
+                Driver information.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence minOccurs="1" maxOccurs="1">
+            <xs:element type="xs:string" name="clazz" minOccurs="1" maxOccurs="1">
+                <xs:annotation>
+                    <xs:documentation>
+                        Fully qualified class name for the datasource driver used
+                        for validating the datasource connection in Falcon.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+            <xs:element type="xs:string" name="jar" minOccurs="1" maxOccurs="unbounded">
+                <xs:annotation>
+                    <xs:documentation>
+                        Path to the connector jar files on HDFS thats shipped with the workflow.
+                        You'd need to put the connector jar files in oozie sharelib and since this
+                        is using all the latest features in sqoop 1.x, requires 1.5 snapshot.
+                    </xs:documentation>
+                </xs:annotation>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="ACL">
+        <xs:annotation>
+            <xs:documentation>
+                Access control list for this cluster.
+                owner is the Owner of this entity.
+                group is the one which has access to read - not used at this time.
+                permission is not enforced at this time
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="owner"/>
+        <xs:attribute type="xs:string" name="group"/>
+        <xs:attribute type="xs:string" name="permission" default="*"/>
+    </xs:complexType>
+</xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 77b8f4b..2974dd6 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -130,7 +130,6 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
     </xs:complexType>
-
     <xs:complexType name="cluster">
         <xs:annotation>
             <xs:documentation>
@@ -157,6 +156,7 @@
             <xs:element type="validity" name="validity"/>
             <xs:element type="retention" name="retention"/>
             <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="import" name="import" minOccurs="0" maxOccurs="1"/>
             <xs:choice minOccurs="0" maxOccurs="1">
                 <xs:element type="locations" name="locations" minOccurs="0"/>
                 <xs:element type="catalog-table" name="table"/>
@@ -166,8 +166,7 @@
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
         <xs:attribute type="xs:string" name="partition" use="optional"/>
-        <xs:attribute type="frequency-type" name="delay" use="optional" /> 
-
+        <xs:attribute type="frequency-type" name="delay" use="optional" />
     </xs:complexType>
     <xs:complexType name="partitions">
         <xs:annotation>
@@ -301,7 +300,6 @@
     <xs:complexType name="partition">
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
     </xs:complexType>
-
     <xs:complexType name="notification">
         <xs:annotation>
             <xs:documentation>
@@ -331,7 +329,6 @@
         </xs:attribute>
         <xs:attribute type="xs:string" name="to" use="required"/>
     </xs:complexType>
-
     <xs:complexType name="ACL">
         <xs:annotation>
             <xs:documentation>
@@ -451,7 +448,90 @@
             <xs:minLength value="1"/>
         </xs:restriction>
     </xs:simpleType>
-
+    <xs:complexType name="import">
+       <xs:sequence>
+            <xs:element type="source" name="source"/>
+            <xs:element type="arguments" name="arguments" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="source">
+        <xs:annotation>
+            <xs:documentation>
+                Specifies the source entity name from which data will be imported.
+                This can be Database or other data source types in the future.
+                Table name specifies the table to import.
+                Extract type specifies a extraction method (full or incremental).
+                DeltaColumn specifies the column name on source databbase table
+                to identify the new data since the last extraction.
+                Merge type specifies how the data will be organized on Hadoop.
+                The supported types are snapshot (as in a particular time) or append
+                (as in timeseries partitions).
+            </xs:documentation>
+        </xs:annotation>
+       <xs:sequence>
+            <xs:element type="extract" name="extract" minOccurs="1"/>
+            <xs:element type="fields-type" name="fields" minOccurs="0"/>
+        </xs:sequence>
+        <xs:attribute type="non-empty-string" name="name" use="required"/>
+        <xs:attribute type="non-empty-string" name="tableName" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="extract">
+        <xs:sequence>
+            <xs:element type="xs:string" name="deltacolumn" minOccurs="0" maxOccurs="1"/>
+            <xs:element type="merge-type" name="mergepolicy" minOccurs="1" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute type="extract-method" name="type" use="required"/>
+    </xs:complexType>
+    <xs:simpleType name="extract-method">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="full"/>
+            <xs:enumeration value="incremental"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:simpleType name="merge-type">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="snapshot"/>
+            <xs:enumeration value="append"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:complexType name="fields-type">
+        <xs:annotation>
+            <xs:documentation>
+                Specifies either an include or exclude fields list. If include field list is specified, only
+                the specified fields will be imported. If exclude field list is specified, all fields except
+                the ones specified will be imported from datasource to HDFS.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:choice minOccurs="1" maxOccurs="1">
+            <xs:element type="field-include-exclude" name="includes"/>
+            <xs:element type="field-include-exclude" name="excludes"/>
+        </xs:choice>
+    </xs:complexType>
+    <xs:complexType name="field-include-exclude">
+        <xs:sequence>
+            <xs:element type="xs:string" name="field" maxOccurs="unbounded" minOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="arguments">
+        <xs:annotation>
+            <xs:documentation>
+                A list of name-value pair of extra arguments to be passed to the concrete implementation.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:sequence>
+            <xs:element type="argument" name="argument" maxOccurs="unbounded" minOccurs="0"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="argument">
+        <xs:annotation>
+            <xs:documentation>
+                A key-value pair, which are used while invoking
+                ingestion engines.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="name" use="required"/>
+        <xs:attribute type="xs:string" name="value" use="required"/>
+    </xs:complexType>
     <xs:complexType name="retention-stage">
         <xs:annotation>
             <xs:documentation>
@@ -469,5 +549,4 @@
             <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element>
         </xs:all>
     </xs:complexType>
-
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/client/src/main/resources/jaxb-binding.xjb b/client/src/main/resources/jaxb-binding.xjb
index 6f1d6c7..978145f 100644
--- a/client/src/main/resources/jaxb-binding.xjb
+++ b/client/src/main/resources/jaxb-binding.xjb
@@ -56,6 +56,15 @@
         <inheritance:extends>org.apache.falcon.entity.v0.EntityNotification</inheritance:extends>
     </jaxb:bindings>
 
+
+    <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='datasource']">
+         <inheritance:extends>org.apache.falcon.entity.v0.Entity</inheritance:extends>
+    </jaxb:bindings>
+
+    <jaxb:bindings schemaLocation="datasource-0.1.xsd" node="//xs:complexType[@name='ACL']">
+        <inheritance:extends>org.apache.falcon.entity.v0.AccessControlList</inheritance:extends>
+    </jaxb:bindings>
+
     <jaxb:globalBindings>
         <xjc:simple/>
     </jaxb:globalBindings>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/client/src/main/resources/mysql_database.xml
----------------------------------------------------------------------
diff --git a/client/src/main/resources/mysql_database.xml b/client/src/main/resources/mysql_database.xml
new file mode 100644
index 0000000..5f88ba4
--- /dev/null
+++ b/client/src/main/resources/mysql_database.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<database colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:database:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+
+        <!-- ***** read interface ***** -->
+        <interface type="readonly" endpoint="jdbc:mysql://c6402/test">
+            <credential type="password-file">
+                <userName>sqoop_user</userName>
+                <passwordFile>/user/ambari-qa/password-store/password_read_user</passwordFile>
+            </credential>
+        </interface>
+
+        <!-- ***** write interface ***** -->
+        <interface type="write"  endpoint="jdbc:mysql://c6402/test">
+            <credential type="password-file">
+                <userName>sqoop2_user</userName>
+                <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
+            </credential>
+        </interface>
+
+        <!-- ***** default credential ***** -->
+        <credential type="password-file">
+            <userName>sqoop2_user</userName>
+            <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
+        </credential>
+
+    </interfaces>
+</database>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
new file mode 100644
index 0000000..f9b3966
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.Credential;
+import org.apache.falcon.entity.v0.datasource.Credentialtype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
+import org.apache.falcon.entity.v0.datasource.Interface;
+import org.apache.falcon.entity.v0.datasource.Interfaces;
+import org.apache.falcon.entity.v0.datasource.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+
+/**
+ * DataSource entity helper methods.
+ */
+
+public final class DatasourceHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatasourceHelper.class);
+
+    private static final ConfigurationStore STORE = ConfigurationStore.get();
+
+    public static DatasourceType getImportSourceType(Cluster feedCluster) throws FalconException {
+        Datasource ds = STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName());
+        return ds.getType();
+    }
+
+    private DatasourceHelper() {}
+
+    public static Datasource getDatasource(Cluster feedCluster) throws FalconException {
+        return STORE.get(EntityType.DATASOURCE, feedCluster.getImport().getSource().getName());
+    }
+    public static String getReadOnlyEndpoint(Datasource db) {
+        return getInterface(db, Interfacetype.READONLY);
+    }
+
+    /**
+     * Returns user name and password pair as it is specified in the XML. If the credential type is
+     * password-file, the path name is returned.
+     *
+     * @param db
+     * @return user name and password pair
+     * @throws FalconException
+     */
+    public static Pair<String, String> getReadPasswordInfo(Datasource db) throws FalconException {
+        for (Interface ifs : db.getInterfaces().getInterfaces()) {
+            if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) {
+                return getPasswordInfo(ifs.getCredential());
+            }
+        }
+        return getDefaultPasswordInfo(db.getInterfaces());
+    }
+
+    /**
+     * Returns user name and actual password pair. If the credential type is password-file, then the
+     * password is read from the HDFS file. If the credential type is password-text, the clear text
+     * password is returned.
+     *
+     * @param db
+     * @return
+     * @throws FalconException
+     */
+    public static java.util.Properties fetchReadPasswordInfo(Datasource db) throws FalconException {
+        Pair<String, String> passwdInfo = getReadPasswordInfo(db);
+        java.util.Properties p = new java.util.Properties();
+        p.put("user", passwdInfo.first);
+        p.put("password", passwdInfo.second);
+        if (getReadPasswordType(db) == Credentialtype.PASSWORD_FILE) {
+            String actualPasswd = readPasswordInfoFromFile(passwdInfo.second);
+            p.put("password", actualPasswd);
+        }
+        return p;
+    }
+
+    /**
+     * Given Datasource, return the read-only credential type. If read-only credential is missing,
+     * use interface's default credential.
+     *
+     * @param db
+     * @return Credentialtype
+     * @throws FalconException
+     */
+    public static Credentialtype getReadPasswordType(Datasource db) throws FalconException {
+        for (Interface ifs : db.getInterfaces().getInterfaces()) {
+            if ((ifs.getType() == Interfacetype.READONLY) && (ifs.getCredential() != null)) {
+                return getPasswordType(ifs.getCredential());
+            }
+        }
+        return getDefaultPasswordType(db.getInterfaces());
+    }
+
+    /**
+     * Return the Interface endpoint for the interface type specified in the argument.
+     *
+     * @param db
+     * @param type - can be read-only or write
+     * @return
+     */
+    private static String getInterface(Datasource db, Interfacetype type) {
+        for(Interface ifs : db.getInterfaces().getInterfaces()) {
+            if (ifs.getType() == type) {
+                return ifs.getEndpoint();
+            }
+        }
+        return null;
+    }
+    private static Credentialtype getPasswordType(Credential c) {
+        return c.getType();
+    }
+
+    private static Credentialtype getDefaultPasswordType(Interfaces ifs) throws FalconException {
+
+        if (ifs.getCredential() != null) {
+            return ifs.getCredential().getType();
+        } else {
+            throw new FalconException("Missing Interfaces default credential");
+        }
+    }
+
+    private static Pair<String, String> getDefaultPasswordInfo(Interfaces ifs) throws FalconException {
+
+        if (ifs.getCredential() != null) {
+            return getPasswordInfo(ifs.getCredential());
+        } else {
+            throw new FalconException("Missing Interfaces default credential");
+        }
+    }
+
+    private static Pair<String, String> getPasswordInfo(Credential c) throws FalconException {
+        String passwd = null;
+        if (c.getType() == Credentialtype.PASSWORD_FILE) {
+            passwd = c.getPasswordFile();
+        } else {
+            passwd = c.getPasswordText();
+        }
+        return new Pair<String, String>(c.getUserName(), passwd);
+    }
+
+    private static String readPasswordInfoFromFile(String passwordFilePath) throws FalconException {
+        try {
+            Path path = new Path(passwordFilePath);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
+            if (!fs.exists(path)) {
+                throw new IOException("The password file does not exist! "
+                        + passwordFilePath);
+            }
+
+            if (!fs.isFile(path)) {
+                throw new IOException("The password file cannot be a directory! "
+                        + passwordFilePath);
+            }
+
+            InputStream is = fs.open(path);
+            StringWriter writer = new StringWriter();
+            try {
+                IOUtils.copy(is, writer);
+                return writer.toString();
+            } finally {
+                IOUtils.closeQuietly(is);
+                IOUtils.closeQuietly(writer);
+                fs.close();
+            }
+        } catch (IOException ioe) {
+            LOG.error("Error reading password file from HDFS : " + ioe);
+            throw new FalconException(ioe);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index ceefb17..66dba6f 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
@@ -681,7 +682,7 @@ public final class EntityUtil {
     //Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
     //Each entity update creates a new staging path
     //Base staging path is the base path for all staging dirs
-    public static Path getBaseStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+    public static Path getBaseStagingPath(Cluster cluster, Entity entity) {
         return new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(),
                 "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
     }
@@ -723,7 +724,7 @@ public final class EntityUtil {
 
     //Creates new staging path for entity schedule/update
     //Staging path containd md5 of the cluster view of the entity. This is required to check if update is required
-    public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
+    public static Path getNewStagingPath(Cluster cluster, Entity entity)
         throws FalconException {
         Entity clusterView = getClusterView(entity, cluster.getName());
         return new Path(getBaseStagingPath(cluster, entity),
@@ -778,7 +779,7 @@ public final class EntityUtil {
         }
     }
 
-    public static Path getLogPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) {
+    public static Path getLogPath(Cluster cluster, Entity entity) {
         return new Path(getBaseStagingPath(cluster, entity), "logs");
     }
 
@@ -1001,6 +1002,20 @@ public final class EntityUtil {
         return result;
     }
 
+    /**
+     * Returns Data Source Type given a feed with Import policy.
+     *
+     * @param cluster
+     * @param feed
+     * @return
+     * @throws FalconException
+     */
+
+    public static DatasourceType getImportDatasourceType(
+            Cluster cluster, Feed feed) throws FalconException {
+        return FeedHelper.getImportDatasourceType(cluster, feed);
+    }
+
     public static EntityNotification getEntityNotification(Entity entity) {
         switch (entity.getEntityType()) {
         case FEED:

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 5c252a8..2c65eba 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -26,14 +26,18 @@ import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
 import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
@@ -301,7 +305,7 @@ public final class FeedHelper {
         clusterVars.put("colo", cluster.getColo());
         clusterVars.put("name", cluster.getName());
         if (cluster.getProperties() != null) {
-            for (Property property : cluster.getProperties().getProperties()) {
+            for (org.apache.falcon.entity.v0.cluster.Property property : cluster.getProperties().getProperties()) {
                 clusterVars.put(property.getName(), property.getValue());
             }
         }
@@ -786,6 +790,184 @@ public final class FeedHelper {
         return result;
     }
 
+
+    /**
+     * Returns the data source type associated with the Feed's import policy.
+     *
+     * @param clusterEntity
+     * @param feed
+     * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType}
+     * @throws FalconException
+     */
+    public static DatasourceType getImportDatasourceType(
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+            Feed feed) throws FalconException {
+        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+        return DatasourceHelper.getImportSourceType(feedCluster);
+    }
+
+    /**
+     * Return if Import policy is enabled in the Feed definition.
+     *
+     * @param feedCluster
+     * @return true if import policy is enabled else false
+     */
+
+    public static boolean isImportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (feedCluster.getType() == ClusterType.SOURCE) {
+            return (feedCluster.getImport() != null);
+        }
+        return false;
+    }
+
+    /**
+     * Returns the data source name associated with the Feed's import policy.
+     *
+     * @param feedCluster
+     * @return DataSource name defined in the Datasource Entity
+     */
+    public static String getImportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getName();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns Datasource table name.
+     *
+     * @param feedCluster
+     * @return Table or Topic name of the Datasource
+     */
+
+    public static String getImportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getTableName();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the extract method type.
+     *
+     * @param feedCluster
+     * @return {@link org.apache.falcon.entity.v0.feed.ExtractMethod}
+     */
+
+    public static ExtractMethod getImportExtractMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getExtract().getType();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the merge type of the Feed import policy.
+     *
+     * @param feedCluster
+     * @return {@link org.apache.falcon.entity.v0.feed.MergeType}
+     */
+    public static MergeType getImportMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        if (isImportEnabled(feedCluster)) {
+            return feedCluster.getImport().getSource().getExtract().getMergepolicy();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns the initial instance date for the import data set or coorinator.
+     *
+     * For snapshot merge type, a latest time will be used since the source data is dumped in whole.
+     * For incremental merge type, start date specified in the cluster validity will be used.
+     *
+     * @param feedCluster
+     * @return Feed cluster validity start date or recent time
+     */
+    public static Date getImportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        Date initialInstance = new Date();
+        if (!FeedHelper.isSnapshotMergeType(feedCluster)) {
+            initialInstance = feedCluster.getValidity().getStart();
+        }
+        return initialInstance;
+    }
+
+    /**
+     * Helper method to check if the merge type is snapshot.
+     *
+     * @param feedCluster
+     * @return true if the feed import policy merge type is snapshot
+     *
+     */
+    public static boolean isSnapshotMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) {
+        return MergeType.SNAPSHOT == getImportMergeType(feedCluster);
+    }
+
+    /**
+     * Returns extra arguments specified in the Feed import policy.
+     *
+     * @param feedCluster
+     * @return
+     * @throws FalconException
+     */
+    public static Map<String, String> getImportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+
+        Map<String, String> argsMap = new HashMap<String, String>();
+        if (feedCluster.getImport().getArguments() == null) {
+            return argsMap;
+        }
+
+        for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getImport().getArguments().getArguments()) {
+            argsMap.put(p.getName().toLowerCase(), p.getValue());
+        }
+        return argsMap;
+    }
+
+    /**
+     * Returns Fields list specified in the Import Policy.
+     *
+     * @param feedCluster
+     * @return List of String
+     * @throws FalconException
+     */
+    public static List<String> getFieldList(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+        if (feedCluster.getImport().getSource().getFields() == null) {
+            return null;
+        }
+        org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields();
+        FieldIncludeExclude includeFileds = fieldType.getIncludes();
+        if (includeFileds == null) {
+            return null;
+        }
+        return includeFileds.getFields();
+    }
+
+
+    /**
+     * Returns true if exclude field lists are used. This is a TBD feature.
+     *
+     * @param feedCluster
+     * @return true of exclude field list is used or false.
+     * @throws FalconException
+     */
+
+    public static boolean isFieldExcludes(org.apache.falcon.entity.v0.feed.Cluster feedCluster)
+        throws FalconException {
+        if (feedCluster.getImport().getSource().getFields() != null) {
+            org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields();
+            FieldIncludeExclude excludeFileds = fieldType.getExcludes();
+            if ((excludeFileds != null) && (excludeFileds.getFields().size() > 0)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName,
                                                                               Date instanceTime)
         throws FalconException {
@@ -813,5 +995,4 @@ public final class FeedHelper {
         }
         return  retentionFrequency;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
new file mode 100644
index 0000000..e58b1e9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.parser;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.DatasourceHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.ACL;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.datasource.Interfacetype;
+import org.apache.falcon.util.HdfsClassLoader;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Parser for DataSource entity definition.
+ */
+
+public class DatasourceEntityParser extends EntityParser<Datasource> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class);
+
+    public DatasourceEntityParser() {
+        super(EntityType.DATASOURCE);
+    }
+
+    @Override
+    public void validate(Datasource db) throws FalconException {
+        ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars());
+            Thread.currentThread().setContextClassLoader(hdfsClassLoader);
+            validateInterface(db, Interfacetype.READONLY, hdfsClassLoader);
+            validateInterface(db, Interfacetype.WRITE, hdfsClassLoader);
+            validateACL(db);
+        } catch(IOException io) {
+            throw new ValidationException("Unable to copy driver jars to local dir: "
+                    + Arrays.toString(db.getDriver().getJars().toArray()));
+        } finally {
+            Thread.currentThread().setContextClassLoader(previousClassLoader);
+        }
+    }
+
+    private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader)
+        throws ValidationException {
+        String endpoint = null;
+        try {
+            endpoint = DatasourceHelper.getReadOnlyEndpoint(db);
+            if (StringUtils.isNotBlank(endpoint)) {
+                LOG.info("Validating {0} endpoint {1} connection.", interfacetype.value(), endpoint);
+                Properties userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db);
+                validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo);
+            }
+        } catch(FalconException fe) {
+            throw new ValidationException(String.format("Cannot validate '%s' "
+                            + "interface '%s' " + "of database entity '%s' due to '%s' ",
+                   interfacetype, endpoint,
+                   db.getName(), fe.getMessage()));
+        }
+    }
+
+    private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass,
+                                    String connectUrl, Properties userPasswdInfo)
+        throws FalconException {
+        try {
+            java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance();
+            LOG.info("Validating connection URL: {0} using driver: {1}", connectUrl, driver.getClass().toString());
+            Connection con = driver.connect(connectUrl, userPasswdInfo);
+            if (con == null) {
+                throw new FalconException("DriverManager.getConnection() return "
+                       + "null for URL : " + connectUrl);
+            }
+        } catch (Exception ex) {
+            LOG.error("Exception while validating connection : ", ex);
+            throw new FalconException(ex);
+        }
+    }
+
+    /**
+     * Validate ACL if authorization is enabled.
+     *
+     * @param  db database entity
+     * @throws ValidationException
+     */
+    private void validateACL(Datasource db) throws ValidationException {
+        if (isAuthorizationDisabled) {
+            return;
+        }
+
+        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
+        final ACL dbACL = db.getACL();
+        if (dbACL == null) {
+            throw new ValidationException("Datasource ACL cannot be empty for:  " + db.getName());
+        }
+
+        validateACLOwnerAndGroup(dbACL);
+
+        try {
+            authorize(db.getName(), dbACL);
+        } catch (AuthorizationException e) {
+            throw new ValidationException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
index 5a33201..b497770 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
@@ -45,6 +45,8 @@ public final class EntityParserFactory {
             return new FeedEntityParser();
         case CLUSTER:
             return new ClusterEntityParser();
+        case DATASOURCE:
+            return new DatasourceEntityParser();
         default:
             throw new IllegalArgumentException("Unhandled entity type: " + entityType);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index c5cfdd2..c70f18d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -33,11 +33,14 @@ import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.ACL;
+import org.apache.falcon.entity.v0.feed.Extract;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Sla;
@@ -55,8 +58,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.TimeZone;
 
@@ -95,6 +99,12 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     cluster.getName());
             validateClusterHasRegistry(feed, cluster);
             validateFeedCutOffPeriod(feed, cluster);
+            if (FeedHelper.isImportEnabled(cluster)) {
+                validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
+                validateFeedExtractionType(feed, cluster);
+                validateFeedImportArgs(cluster);
+                validateFeedImportFieldExcludes(cluster);
+            }
         }
 
         validateFeedStorage(feed);
@@ -553,4 +563,54 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
         }
     }
+
+    /**
+     * Validate extraction and merge type combination. Currently supported combo:
+     *
+     * ExtractionType = FULL and MergeType = SNAPSHOT.
+     * ExtractionType = INCREMENTAL and MergeType = APPEND.
+     *
+     * @param feed Feed entity
+     * @param cluster Cluster referenced in the Feed definition
+     * @throws FalconException
+     */
+
+    private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException {
+        Extract extract = cluster.getImport().getSource().getExtract();
+
+        if (ExtractMethod.FULL == extract.getType())  {
+            if ((MergeType.SNAPSHOT != extract.getMergepolicy())
+                    || (extract.getDeltacolumn() != null)) {
+                throw new ValidationException(String.format("Feed %s is using FULL "
+                        + "extract method but specifies either a superfluous "
+                        + "deltacolumn  or a mergepolicy other than snapshot", feed.getName()));
+            }
+        }  else {
+            throw new ValidationException(String.format("Feed %s is using unsupported "
+                    + "extraction mechanism %s", feed.getName(), extract.getType().value()));
+        }
+    }
+
+    /**
+     * Validate improt arguments.
+     * @param feedCluster Cluster referenced in the feed
+     */
+    private void validateFeedImportArgs(Cluster feedCluster) throws FalconException {
+        Map<String, String> args = FeedHelper.getImportArguments(feedCluster);
+        int numMappers = 1;
+        if (args.containsKey("--num-mappers")) {
+            numMappers = Integer.parseInt(args.get("--num-mappers"));
+        }
+        if ((numMappers > 1) && (!args.containsKey("--split-by"))) {
+            throw new ValidationException(String.format("Feed import expects "
+                    + "--split-by column when --num-mappers > 1"));
+        }
+    }
+
+    private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException {
+        if (FeedHelper.isFieldExcludes(feedCluster)) {
+            throw new ValidationException(String.format("Field excludes are not supported "
+                    + "currently in Feed import policy"));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 4dd1c68..9c7a932 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -59,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 public final class ConfigurationStore implements FalconService {
 
     private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
-        EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, };
+        EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, };
     public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED,
         EntityType.CLUSTER, };
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index bd4c6cf..e4d9385 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity.v0;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -189,6 +190,16 @@ public final class EntityGraph implements ConfigurationChangeListener {
             Set<Node> clusterEdges = nodeEdges.get(clusterNode);
             feedEdges.add(clusterNode);
             clusterEdges.add(feedNode);
+
+            if (FeedHelper.isImportEnabled(cluster)) {
+                Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
+                if (!nodeEdges.containsKey(dbNode)) {
+                    nodeEdges.put(dbNode, new HashSet<Node>());
+                }
+                Set<Node> dbEdges = nodeEdges.get(dbNode);
+                feedEdges.add(dbNode);
+                dbEdges.add(feedNode);
+            }
         }
         return nodeEdges;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
index bd32852..4c7e913 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
@@ -46,6 +46,9 @@ public final class EntityIntegrityChecker {
         case FEED:
             return filter(deps, EntityType.PROCESS);
 
+        case DATASOURCE:
+            return filter(deps, EntityType.FEED);
+
         default:
             return null;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 8c3876c..25bbf0c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -20,10 +20,12 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
@@ -64,6 +66,10 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         case FEED:
             addFeedEntity((Feed) entity);
             break;
+        case DATASOURCE:
+            addDatasourceEntity((Datasource) entity);
+            break;
+
         default:
             throw new IllegalArgumentException("Invalid EntityType " + entityType);
         }
@@ -91,8 +97,25 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
             }
         }
+
+        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
+            if (FeedHelper.isImportEnabled(feedCluster)) {
+                addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster),
+                        RelationshipLabel.DATASOURCE_IMPORT_EDGE);
+            }
+        }
+    }
+
+    public void addDatasourceEntity(Datasource dsEntity) {
+        LOG.info("Adding datasource entity: {}", dsEntity.getName());
+        Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY);
+
+        addUserRelation(dsVertex);
+        addColoRelation(dsEntity.getColo(), dsVertex);
+        addDataClassification(dsEntity.getTags(), dsVertex);
     }
 
+
     public void updateEntity(Entity oldEntity, Entity newEntity) {
         EntityType entityType = oldEntity.getEntityType();
         switch (entityType) {
@@ -177,6 +200,16 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
         addEdge(fromVertex, clusterVertex, edgeLabel.getName());
     }
 
+    public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) {
+        Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY);
+        if (clusterVertex == null) { // cluster must exist before adding other entities
+            LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName);
+            throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName);
+        }
+
+        addEdge(fromVertex, clusterVertex, edgeLabel.getName());
+    }
+
     public void addInputFeeds(Inputs inputs, Vertex processVertex) {
         if (inputs == null) {
             return;

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index f485764..b709857 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -267,6 +267,39 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         }
     }
 
+
+    public void addImportedInstance(WorkflowExecutionContext context) throws FalconException {
+
+        String feedName = context.getOutputFeedNames();
+        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
+        String datasourceName = context.getDatasourceName();
+        String sourceClusterName = context.getSrcClusterName();
+
+        LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} "
+                       +  "from datasource: {}", feedName,
+                feedInstanceDataPath, sourceClusterName, datasourceName);
+        String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName,
+                feedInstanceDataPath, context.getNominalTimeAsISO8601());
+        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        LOG.info("Vertex exists? name={}, type={}, v={}",
+                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+            LOG.info("{} instance vertex {} does not exist, add it",
+                    RelationshipType.FEED_INSTANCE, feedInstanceName);
+            feedInstanceVertex = addFeedInstance(// add a new instance
+                    feedInstanceName, context, feedName, context.getSrcClusterName());
+        }
+        addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY,
+                RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601());
+        addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY,
+                RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601());
+    }
+
+    public String getImportInstanceName(WorkflowExecutionContext context) {
+        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
+    }
+
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 56fbde0..cf2b651 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -287,8 +287,11 @@ public class MetadataMappingService
         case DELETE:
             onFeedInstanceEvicted(context);
             break;
+        case IMPORT:
+            onFeedInstanceImported(context);
+            break;
         default:
-            throw new IllegalArgumentException("Invalid EntityOperation" + entityOperation);
+            throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
         }
     }
 
@@ -328,4 +331,8 @@ public class MetadataMappingService
         LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
         instanceGraphBuilder.addEvictedInstance(context);
     }
+    private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException {
+        LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601());
+        instanceGraphBuilder.addImportedInstance(context);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index 5b312da..6d4bf46 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -28,6 +28,7 @@ public enum RelationshipLabel {
     PROCESS_CLUSTER_EDGE("runs-on"),
     FEED_PROCESS_EDGE("input"),
     PROCESS_FEED_EDGE("output"),
+    DATASOURCE_IMPORT_EDGE("import"),
 
     // instance edge labels
     INSTANCE_ENTITY_EDGE("instance-of"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
new file mode 100644
index 0000000..3f9091f
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class loader that fetches jars from HDFS location and loads into JVM.
+ */
+
+public class HdfsClassLoader extends URLClassLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class);
+    private static Map<String, HdfsClassLoader>  classLoaderCache = new ConcurrentHashMap<String, HdfsClassLoader>();
+    private static final Object LOCK = new Object();
+
+    public static ClassLoader load(final String name, final List<String> jarHdfsPath) throws IOException {
+        LOG.info("ClassLoader cache size = " + classLoaderCache.size());
+        if (classLoaderCache.containsKey(name)) {
+            return classLoaderCache.get(name);
+        }
+
+        synchronized (LOCK) {
+            LOG.info("Copying jar files from HDFS to local dir");
+            final URL[] urls = copyHdfsJarFilesToTempDir(name, jarHdfsPath);
+            final ClassLoader parentClassLoader = HdfsClassLoader.class.getClassLoader();
+            LOG.info("Creating a new HdfsClassLoader for name = {0} with parent = {1} using classpath = {2}",
+                    name, parentClassLoader.toString(),  Arrays.toString(jarHdfsPath.toArray()));
+            HdfsClassLoader hdfsClassLoader = java.security.AccessController.doPrivileged(
+                    new java.security.PrivilegedAction<HdfsClassLoader>() {
+                        @Override
+                        public HdfsClassLoader run() {
+                            return new HdfsClassLoader(name, urls, parentClassLoader);
+                        }
+                    }
+            );
+            classLoaderCache.put(name, hdfsClassLoader);
+            return hdfsClassLoader;
+        }
+    }
+
+    private final ClassLoader realParent;
+
+    public HdfsClassLoader(String name, URL[] urls, ClassLoader parentClassLoader) {
+        // set the 'parent' member to null giving an option for this class loader
+        super(urls, null);
+        this.realParent = parentClassLoader;
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve)
+        throws ClassNotFoundException {
+
+        // Load through the parent class loader first and then fallback to this class loader.
+        try {
+            return realParent.loadClass(name);
+        } catch (Throwable t) {
+            return super.loadClass(name, resolve);
+        }
+    }
+
+    @Override
+    public URL getResource(String name) {
+        // This is the same as the jdk's getResource except the parent
+        // is taken from the realParent member instead of the parent member.
+        URL url = realParent.getResource(name);
+        if (url == null) {
+            url = findResource(name);
+        }
+        return url;
+    }
+
+    private static URL[] copyHdfsJarFilesToTempDir(String databaseName, List<String> jars) throws IOException {
+        List<URL> urls = new ArrayList<URL>();
+
+        final Configuration conf = new Configuration();
+        Path localPath = createTempDir(databaseName, conf);
+
+        for (String jar : jars) {
+            Path jarPath = new Path(jar);
+            final FileSystem fs = jarPath.getFileSystem(conf);
+            if (fs.isFile(jarPath) && jarPath.getName().endsWith(".jar")) {
+                LOG.info("Copying jarFile = " + jarPath);
+                fs.copyToLocalFile(jarPath, localPath);
+            }
+        }
+        urls.addAll(getJarsInPath(localPath.toUri().toURL()));
+
+        return urls.toArray(new URL[urls.size()]);
+    }
+
+    private static Path createTempDir(String databaseName, Configuration conf) throws IOException {
+        String tmpBaseDir = String.format("file://%s", System.getProperty("java.io.tmpdir"));
+        if (StringUtils.isBlank(tmpBaseDir)) {
+            tmpBaseDir = "file:///tmp";
+        }
+        Path localPath = new Path(tmpBaseDir, databaseName);
+        localPath.getFileSystem(conf).mkdirs(localPath);
+        return localPath;
+    }
+
+    private static List<URL> getJarsInPath(URL fileURL) throws MalformedURLException {
+        List<URL> urls = new ArrayList<URL>();
+
+        File file = new File(fileURL.getPath());
+        if (file.isDirectory()) {
+            File[] jarFiles = file.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File file) {
+                    return file.isFile() && file.getName().endsWith(".jar");
+                }
+            });
+
+            for (File jarFile : jarFiles) {
+                urls.add(jarFile.toURI().toURL());
+            }
+
+            if (!fileURL.toString().endsWith("/")) {
+                fileURL = new URL(fileURL.toString() + "/");
+            }
+        }
+
+        urls.add(fileURL);
+        return urls;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index ac7140c..915e8c2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -38,6 +38,7 @@ public enum WorkflowExecutionArgs {
     // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and
     // the values in conf.
     DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false),
+    DATASOURCE_NAME("datasource", "name of the datasource", false),
 
     // who
     WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9bfc51b..899165b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+
 /**
  * Captures the workflow execution context.
  */
@@ -74,7 +75,7 @@ public class WorkflowExecutionContext {
      * Entity operations supported.
      */
     public enum EntityOperations {
-        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD
+        GENERATE, DELETE, ARCHIVE, REPLICATE, CHMOD, IMPORT
     }
 
     public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
@@ -299,9 +300,12 @@ public class WorkflowExecutionContext {
     }
 
     public long getExecutionCompletionTime() {
+
         return creationTime;
     }
 
+    public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); }
+
     public long getWorkflowStartTime() {
         return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index aab9cee..a6d607b 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -58,6 +58,7 @@ public class AbstractTestBase {
     protected static final String FEED3_XML = "/config/feed/feed-0.3.xml";
     protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+    protected static final String DATASOURCE_XML = "/config/datasource/datasource-0.1.xml";
     protected EmbeddedCluster dfsCluster;
     protected Configuration conf = new Configuration();
     private ConfigurationStore store;

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
index 640e87d..5a4d6ec 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityTypeTest.java
@@ -38,6 +38,7 @@ public class EntityTypeTest {
         Assert.assertTrue(EntityType.PROCESS.isSchedulable());
         Assert.assertTrue(EntityType.FEED.isSchedulable());
         Assert.assertFalse(EntityType.CLUSTER.isSchedulable());
+        Assert.assertFalse(EntityType.DATASOURCE.isSchedulable());
     }
 
     @Test
@@ -48,6 +49,8 @@ public class EntityTypeTest {
         Assert.assertEquals(EntityType.CLUSTER, EntityType.getEnum("cluSTER"));
         Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("process"));
         Assert.assertEquals(EntityType.PROCESS, EntityType.getEnum("pRocess"));
+        Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("datasource"));
+        Assert.assertEquals(EntityType.DATASOURCE, EntityType.getEnum("dataSource"));
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)


[2/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran

Posted by so...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 4020d36..60bf1be 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -28,14 +28,25 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.Argument;
+import org.apache.falcon.entity.v0.feed.Arguments;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Extract;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.FieldsType;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
+import org.apache.falcon.entity.v0.feed.Import;
 import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Source;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Validity;
+
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
@@ -723,7 +734,6 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(result, expected);
     }
 
-    @Test
     public void testIsLifeCycleEnabled() throws Exception {
         Feed feed = new Feed();
 
@@ -849,6 +859,44 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(4)"));
     }
 
+    @Test
+    public void testFeedImportSnapshot() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        Assert.assertNotNull(feed.getClusters().getClusters());
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0));
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity());
+        Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity().getStart());
+        Assert.assertNotNull(startInstResult);
+        Assert.assertNotNull(feedCluster.getValidity().getStart());
+        Assert.assertEquals(getDate("2012-02-07 00:00 UTC"), feedCluster.getValidity().getStart());
+        Assert.assertTrue(FeedHelper.isImportEnabled(feedCluster));
+        Assert.assertEquals(MergeType.SNAPSHOT, FeedHelper.getImportMergeType(feedCluster));
+        Assert.assertNotEquals(startInstResult, feedCluster.getValidity().getStart());
+    }
+
+    @Test
+    public void testFeedImportFields() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        List<String> fieldList = FeedHelper.getFieldList(feedCluster);
+        Assert.assertEquals(2, fieldList.size());
+        Assert.assertFalse(FeedHelper.isFieldExcludes(feedCluster));
+    }
+
+    @Test
+    public void testFeedImportAppend() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = importFeedAppend(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster);
+        Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart());
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));
@@ -881,6 +929,11 @@ public class FeedHelperTest extends AbstractTestBase {
 
     private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
         throws FalconException, ParseException {
+        return publishFeed(cluster, frequency, start, end, null);
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end, Import imp)
+        throws FalconException, ParseException {
 
         Feed feed = new Feed();
         feed.setName("feed");
@@ -889,6 +942,8 @@ public class FeedHelperTest extends AbstractTestBase {
         feed.setTimezone(UTC);
         Clusters fClusters = new Clusters();
         org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setType(ClusterType.SOURCE);
+        fCluster.setImport(imp);
         fCluster.setName(cluster.getName());
         fCluster.setValidity(getFeedValidity(start, end));
         fClusters.getClusters().add(fCluster);
@@ -913,4 +968,54 @@ public class FeedHelperTest extends AbstractTestBase {
         process.setFrequency(f);
         return process;
     }
+
+    private Feed importFeedSnapshot(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Import imp = getAnImport(MergeType.SNAPSHOT);
+        Feed feed = publishFeed(cluster, frequency, start, end, imp);
+        return feed;
+    }
+
+    private Feed importFeedAppend(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Import imp = getAnImport(MergeType.APPEND);
+        Feed feed = publishFeed(cluster, frequency, start, end);
+        return feed;
+    }
+
+    private Import getAnImport(MergeType mergeType) {
+        Extract extract = new Extract();
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(mergeType);
+
+        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
+        fieldInclude.getFields().add("id");
+        fieldInclude.getFields().add("name");
+        FieldsType fields = new FieldsType();
+        fields.setIncludes(fieldInclude);
+
+        Source source = new Source();
+        source.setName("test-db");
+        source.setTableName("test-table");
+        source.setExtract(extract);
+        source.setFields(fields);
+
+        Argument a1 = new Argument();
+        a1.setName("--split_by");
+        a1.setValue("id");
+        Argument a2 = new Argument();
+        a2.setName("--num-mappers");
+        a2.setValue("2");
+        Arguments args = new Arguments();
+        List<Argument> argList = args.getArguments();
+        argList.add(a1);
+        argList.add(a2);
+
+        Import imp = new Import();
+        imp.setSource(source);
+        imp.setArguments(args);
+        return imp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
new file mode 100644
index 0000000..9567eab
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity.parser;
+
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+
+/**
+ * Test class for Datasource Entity.
+ */
+public class DatasourceEntityParserTest extends AbstractTestBase {
+
+    private final DatasourceEntityParser datasourceEntityParser =
+            (DatasourceEntityParser) EntityParserFactory.getParser(EntityType.DATASOURCE);
+    private final FeedEntityParser feedEntityParser =
+            (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        cleanupStore();
+    }
+
+    @Test
+    public void testDatasourceEntity() throws Exception {
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-0.1.xml");
+        Datasource datasource = datasourceEntityParser.parse(stream);
+
+        ConfigurationStore store = ConfigurationStore.get();
+        store.publish(EntityType.DATASOURCE, datasource);
+
+        Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName());
+        Assert.assertEquals("test-hsql-db", datasource.getName());
+        Assert.assertEquals("test-hsql-db", databaseEntity.getName());
+        Assert.assertEquals("hsql", databaseEntity.getType().value());
+        Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+    }
+
+    @Test
+    public void testDatasourcePasswordFileEntity() throws Exception {
+
+        InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-file-0.1.xml");
+        Datasource datasource = datasourceEntityParser.parse(stream);
+        ConfigurationStore store = ConfigurationStore.get();
+        store.publish(EntityType.DATASOURCE, datasource);
+
+        Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName());
+        Assert.assertEquals("test-hsql-db", datasource.getName());
+        Assert.assertEquals("test-hsql-db", databaseEntity.getName());
+        Assert.assertEquals("hsql", databaseEntity.getType().value());
+        Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 905be68..7f4abce 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -30,12 +30,16 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.ActionType;
+import org.apache.falcon.entity.v0.feed.Argument;
 import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -57,6 +61,7 @@ import javax.xml.bind.Unmarshaller;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.Map;
 
 import static org.testng.AssertJUnit.assertEquals;
 
@@ -92,6 +97,11 @@ public class FeedEntityParserTest extends AbstractTestBase {
         LifecyclePolicyMap.get().init();
         CurrentUser.authenticate(FalconTestUtil.TEST_USER_2);
         modifiableFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
+        Unmarshaller dsUnmarshaller = EntityType.DATASOURCE.getUnmarshaller();
+        Datasource ds = (Datasource) dsUnmarshaller.unmarshal(this.getClass()
+                .getResourceAsStream(DATASOURCE_XML));
+        ds.setName("test-hsql-db");
+        store.publish(EntityType.DATASOURCE, ds);
     }
 
     @Test(expectedExceptions = ValidationException.class)
@@ -949,7 +959,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
     public void testValidateACLForArchiveReplication() throws Exception {
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         Assert.assertTrue(Boolean.valueOf(
-            StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
+                StartupProperties.get().getProperty("falcon.security.authorization.enabled")));
 
         CurrentUser.authenticate(USER);
         try {
@@ -986,6 +996,148 @@ public class FeedEntityParserTest extends AbstractTestBase {
     }
 
     @Test
+    public void testImportFeedSqoop() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        final org.apache.falcon.entity.v0.feed.Cluster srcCluster = feed.getClusters().getClusters().get(0);
+        Assert.assertEquals("test-hsql-db", FeedHelper.getImportDatasourceName(srcCluster));
+        Assert.assertEquals("customer", FeedHelper.getImportDataSourceTableName(srcCluster));
+        Assert.assertEquals(2, srcCluster.getImport().getSource().getFields().getIncludes().getFields().size());
+    }
+
+    @Test
+    public void testImportFeedSqoopMinimal() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-noargs-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        final org.apache.falcon.entity.v0.feed.Cluster srcCluster = feed.getClusters().getClusters().get(0);
+        Assert.assertEquals("test-hsql-db", FeedHelper.getImportDatasourceName(srcCluster));
+        Assert.assertEquals("customer", FeedHelper.getImportDataSourceTableName(srcCluster));
+        Map<String, String> args = FeedHelper.getImportArguments(srcCluster);
+        Assert.assertEquals(0, args.size());
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedSqoopExcludeFields() throws Exception {
+
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-exclude-fields-0.1.xml");
+        Feed feed = parser.parseAndValidate(feedStream);
+        Assert.fail("An exception should have been thrown: Feed Import policy not yet implement Field exclusion.");
+    }
+
+    @Test
+    public void testImportFeedSqoopArgs() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+
+        Argument splitByArg = new Argument();
+        splitByArg.setName("--split-by");
+        splitByArg.setValue("id");
+
+        Argument numMappersArg = new Argument();
+        numMappersArg.setName("--num-mappers");
+        numMappersArg.setValue("3");
+
+        args.getArguments().clear();
+        args.getArguments().add(numMappersArg);
+        args.getArguments().add(splitByArg);
+
+        parser.validate(importFeed);
+    }
+
+    @Test
+    public void testImportFeedSqoopArgsSplitBy() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+        Argument splitByArg = new Argument();
+        splitByArg.setName("--split-by");
+        splitByArg.setValue("id");
+
+        args.getArguments().clear();
+        args.getArguments().add(splitByArg);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedSqoopArgsNumMapper() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Arguments args =
+                importFeed.getClusters().getClusters().get(0).getImport().getArguments();
+        Argument numMappersArg = new Argument();
+        numMappersArg.setName("--num-mappers");
+        numMappersArg.setValue("2");
+
+        args.getArguments().clear();
+        args.getArguments().add(numMappersArg);
+
+        parser.validate(importFeed);
+        Assert.fail("An exception should have been thrown: Feed Import should specify "
+                + "--split-by column along with --num-mappers");
+    }
+
+    @Test
+    public void testImportFeedExtractionType1() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(MergeType.SNAPSHOT);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedExtractionType2() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(MergeType.APPEND);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = ValidationException.class)
+    public void testImportFeedExtractionType3() throws Exception {
+        final InputStream inputStream = this.getClass().getResourceAsStream("/config/feed/feed-import-0.1.xml");
+        Feed importFeed = parser.parse(inputStream);
+
+        org.apache.falcon.entity.v0.feed.Extract extract =
+                importFeed.getClusters().getClusters().get(0).getImport().getSource().getExtract();
+
+        extract.setType(ExtractMethod.INCREMENTAL);
+        extract.setMergepolicy(MergeType.APPEND);
+
+        parser.validate(importFeed);
+    }
+
+    @Test (expectedExceptions = {ValidationException.class, FalconException.class})
+    public void testImportFeedSqoopInvalid() throws Exception {
+
+        InputStream feedStream = this.getClass().getResourceAsStream("/config/feed/feed-import-invalid-0.1.xml");
+        parser.parseAndValidate(feedStream);
+        Assert.fail("ValidationException should have been thrown");
+    }
+
     public void testValidateEmailNotification() throws Exception {
         Feed feedNotification = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
                 (FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
@@ -1040,5 +1192,4 @@ public class FeedEntityParserTest extends AbstractTestBase {
         feed.getClusters().getClusters().get(0).getValidity().setEnd(null);
         parser.validate(feed);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
index 3863b11..f49362f 100644
--- a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java
@@ -20,9 +20,20 @@ package org.apache.falcon.entity.v0;
 
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Argument;
+import org.apache.falcon.entity.v0.feed.Arguments;
 import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Extract;
+import org.apache.falcon.entity.v0.feed.ExtractMethod;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.FieldsType;
+import org.apache.falcon.entity.v0.feed.FieldIncludeExclude;
+import org.apache.falcon.entity.v0.feed.Import;
+import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Source;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
@@ -31,6 +42,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -122,6 +134,57 @@ public class EntityGraphTest extends AbstractTestBase {
         return f1;
     }
 
+    private Feed addFeedImport(String feed, Cluster cluster, Datasource ds) {
+
+        Feed f1 = new Feed();
+        f1.setName(feed);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName(cluster.getName());
+        feedCluster.setType(ClusterType.SOURCE);
+        Clusters clusters = new Clusters();
+        clusters.getClusters().add(feedCluster);
+        f1.setClusters(clusters);
+
+        Import imp = getAnImport(MergeType.SNAPSHOT, ds);
+        f1.getClusters().getClusters().get(0).setImport(imp);
+        return f1;
+    }
+
+    private Import getAnImport(MergeType mergeType, Datasource ds) {
+        Extract extract = new Extract();
+        extract.setType(ExtractMethod.FULL);
+        extract.setMergepolicy(mergeType);
+
+        FieldsType fields = new FieldsType();
+        FieldIncludeExclude fieldInclude = new FieldIncludeExclude();
+        fieldInclude.getFields().add("id");
+        fieldInclude.getFields().add("name");
+        fields.setIncludes(fieldInclude);
+
+        Source source = new Source();
+        source.setName(ds.getName());
+        source.setTableName("test-table");
+        source.setExtract(extract);
+        source.setFields(fields);
+
+        Argument a1 = new Argument();
+        a1.setName("--split_by");
+        a1.setValue("id");
+        Argument a2 = new Argument();
+        a2.setName("--num-mappers");
+        a2.setValue("2");
+        Arguments args = new Arguments();
+        List<Argument> argList = args.getArguments();
+        argList.add(a1);
+        argList.add(a2);
+
+        Import imp = new Import();
+        imp.setSource(source);
+        imp.setArguments(args);
+        return imp;
+    }
+
     private void attachInput(Process process, Feed feed) {
         if (process.getInputs() == null) {
             process.setInputs(new Inputs());
@@ -283,4 +346,63 @@ public class EntityGraphTest extends AbstractTestBase {
     @Test
     public void testOnChange() throws Exception {
     }
+
+    @Test
+    public void testOnAddImport() throws Exception {
+
+        Datasource ds = new Datasource();
+        ds.setName("test-db");
+        ds.setColo("c1");
+
+        Cluster cluster = new Cluster();
+        cluster.setName("ci1");
+        cluster.setColo("c1");
+
+        Feed f1 = addFeedImport("fi1", cluster, ds);
+
+        store.publish(EntityType.CLUSTER, cluster);
+        store.publish(EntityType.DATASOURCE, ds);
+        store.publish(EntityType.FEED, f1);
+
+        Set<Entity> entities = graph.getDependents(cluster);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(ds);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertTrue(entities.contains(f1));
+
+        entities = graph.getDependents(f1);
+        Assert.assertEquals(entities.size(), 2);
+        Assert.assertTrue(entities.contains(cluster));
+        Assert.assertTrue(entities.contains(ds));
+
+        store.remove(EntityType.FEED, "fi1");
+        store.remove(EntityType.DATASOURCE, "test-db");
+        store.remove(EntityType.CLUSTER, "ci1");
+    }
+
+    @Test
+    public void testOnRemoveDatasource() throws Exception {
+
+        Datasource ds = new Datasource();
+        ds.setName("test-db");
+        ds.setColo("c1");
+
+        Cluster cluster = new Cluster();
+        cluster.setName("ci1");
+        cluster.setColo("c1");
+
+        Feed f1 = addFeedImport("fi1", cluster, ds);
+
+        store.publish(EntityType.CLUSTER, cluster);
+        store.publish(EntityType.DATASOURCE, ds);
+        store.publish(EntityType.FEED, f1);
+
+        store.remove(EntityType.DATASOURCE, "test-db");
+
+        Set<Entity> entities = graph.getDependents(f1);
+        Assert.assertEquals(1, entities.size());
+        Assert.assertTrue(entities.contains(cluster));
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-0.1.xml b/common/src/test/resources/config/datasource/datasource-0.1.xml
new file mode 100644
index 0000000..5b09f10
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-0.1.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="HSQL database on west coast" type="hsql" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-file-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-file-0.1.xml b/common/src/test/resources/config/datasource/datasource-file-0.1.xml
new file mode 100644
index 0000000..3ee40ed
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-file-0.1.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="HSQL database on west coast" type="hsql" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-file">
+                <userName>SA</userName>
+                <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-file">
+                <userName>SA</userName>
+                <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+            </credential>
+        </interface>
+
+        <credential type="password-file">
+            <userName>SA</userName>
+            <passwordFile>"jail://global:00/falcon/passwordfile"/></passwordFile>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml b/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
new file mode 100644
index 0000000..04fe737
--- /dev/null
+++ b/common/src/test/resources/config/datasource/datasource-invalid-0.1.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<datasource colo="west-coast" description="A new database on west coast" type="xyz" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1">
+    <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags>
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-0.1.xml b/common/src/test/resources/config/feed/feed-import-0.1.xml
new file mode 100644
index 0000000..798d6b0
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-0.1.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml b/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
new file mode 100644
index 0000000..5a6fcd9
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-exclude-fields-0.1.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <excludes>
+                            <field>id</field>
+                            <field>name</field>
+                        </excludes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml b/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
new file mode 100644
index 0000000..9428bce
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-invalid-0.1.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml b/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
new file mode 100644
index 0000000..c96249c
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-import-noargs-0.1.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer data" name="CustomerFeed" xmlns="uri:falcon:feed:0.1">
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <import>
+                <source name="test-hsql-db" tableName="customer">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                </source>
+            </import>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index e07fe12..b357c44 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -98,6 +98,61 @@ A key-value pair, which are propagated to the workflow engine.
 Ideally JMS impl class name of messaging engine (brokerImplClass) 
 should be defined here.
 
+---++ Datasource Specification
+
+The datasource entity contains connection information required to connect to a data source like MySQL database.
+The datasource XSD specification is available here:
+A datasource contains read and write interfaces which are used by Falcon to import or export data from or to
+datasources respectively. A datasource is referenced by feeds which are on-boarded to Falcon by its name.
+
+Following are the tags defined in a datasource.xml:
+
+<verbatim>
+<datasource colo="west-coast" description="Customer database on west coast" type="mysql"
+ name="test-hsql-db" xmlns="uri:falcon:datasource:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+</verbatim>
+
+The colo specifies the colo to which the datasource belongs to and name is the name of the datasource which has to
+be unique.
+
+---+++ Interfaces
+
+A datasource has two interfaces as described below:
+<verbatim>
+    <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db"/>
+</verbatim>
+
+A readonly interface specifies the endpoint and protocol to connect to a datasource.
+This would be used in the context of import from datasource into HDFS.
+
+<verbatim>
+<interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+</verbatim>
+
+A write interface specifies the endpoint and protocol to to write to the datasource.
+Falcon uses this interface to export data from hdfs to datasource.
+
+<verbatim>
+<credential type="password-text">
+    <userName>SA</userName>
+    <passwordText></passwordText>
+</credential>
+</verbatim>
+
+
+A credential is associated with an interface (read or write) providing user name and password to authenticate
+to the datasource.
+
+<verbatim>
+<credential type="password-text">
+     <userName>SA</userName>
+     <passwordFile>hdfs-file-path</passwordText>
+</credential>
+</verbatim>
+
+The credential can be specified via a password file present in the HDFS. This file should only be accessible by
+the user.
+
 ---++ Feed Specification
 The Feed XSD specification is available here.
 A Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies.
@@ -244,6 +299,35 @@ expressions like frequency. slaLow is intended to serve for alerting for feed in
 availability SLAs. slaHigh is intended to serve for reporting the feeds which missed their SLAs. SLAs are relative to
 feed instance time.
 
+---+++ Import
+
+<verbatim>
+<import>
+    <source name="test-hsql-db" tableName="customer">
+        <extract type="full">
+            <mergepolicy>snapshot</mergepolicy>
+         </extract>
+         <fields>
+            <includes>
+                <field>id</field>
+                <field>name</field>
+            </includes>
+         </fields>
+    </source>
+    <arguments>
+        <argument name="--split-by" value="id"/>
+        <argument name="--num-mappers" value="2"/>
+    </arguments>
+</import>
+
+A feed can have an import policy associated with it. The souce name specified the datasource reference to the
+datasource entity from which the data will be imported to HDFS. The tableName spcified the table or topic to be
+imported from the datasource. The extract type specifies the pull mechanism (full or
+incremental extract). Full extract method extracts all the data from the datasource. The incremental extraction
+method feature implementation is in progress. The mergeplocy determines how the data is to be layed out on HDFS.
+The snapshot layout creates a snapshot of the data on HDFS using the feed's location specification. Fields is used
+to specify the projection columns. Feed import from database underneath uses sqoop to achieve the task. Any advanced
+Sqoop options can be specified via the arguments.
 
 ---+++ Late Arrival
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d994866..e001a7f 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -29,6 +29,9 @@ $FALCON_HOME/bin/falcon entity -submit -type cluster -file /cluster/definition.x
 
 Submit option is used to set up entity definition.
 
+Usage:
+$FALCON_HOME/bin/falcon entity -submit -type [cluster|datasource|feed|process] -file <entity-definition.xml>
+
 Example: 
 $FALCON_HOME/bin/falcon entity -submit -type cluster -file /cluster/definition.xml
 
@@ -65,7 +68,7 @@ Usage:
 Delete removes the submitted entity definition for the specified entity and put it into the archive.
 
 Usage:
-$FALCON_HOME/bin/falcon entity  -type [cluster|feed|process] -name <<name>> -delete
+$FALCON_HOME/bin/falcon entity  -type [cluster|datasource|feed|process] -name <<name>> -delete
 
 ---+++List
 
@@ -75,7 +78,7 @@ Usage:
 $FALCON_HOME/bin/falcon entity -list
 
 Optional Args : -fields <<field1,field2>>
--type <<[cluster|feed|process],[cluster|feed|process]>>
+-type <<[cluster|datasource|feed|process],[cluster|datasource|feed|process]>>
 -nameseq <<namesubsequence>> -tagkeys <<tagkeyword1,tagkeyword2>>
 -filterBy <<field1:value1,field2:value2>> -tags <<tagkey=tagvalue,tagkey=tagvalue>>
 -orderBy <<field>> -sortOrder <<sortOrder>> -offset 0 -numResults 10
@@ -98,8 +101,8 @@ Optional Args : -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" -fie
 
 ---+++Update
 
-Update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently
-not allowed.
+Update operation allows an already submitted/scheduled entity to be updated. Cluster and datasource updates are
+currently not allowed.
 
 Usage:
 $FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -update -file <<path_to_file>>
@@ -123,21 +126,23 @@ Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie
 Status returns the current status of the entity.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -status
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -status
 
 ---+++Dependency
 
-With the use of dependency option, we can list all the entities on which the specified entity is dependent. For example for a feed, dependency return the cluster name and for process it returns all the input feeds, output feeds and cluster names.
+With the use of dependency option, we can list all the entities on which the specified entity is dependent.
+For example for a feed, dependency return the cluster name and for process it returns all the input feeds,
+output feeds and cluster names.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -dependency
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -dependency
 
 ---+++Definition
 
 Definition option returns the entity definition submitted earlier during submit step.
 
 Usage:
-$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -definition
+$FALCON_HOME/bin/falcon entity -type [cluster|datasource|feed|process] -name <<name>> -definition
 
 
 ---+++Lookup
@@ -460,7 +465,7 @@ $FALCON_HOME/bin/falcon metadata -edge -id Q9n-Q-5g
 
 Lists of all dimensions of given type. If the user provides optional param cluster, only the dimensions related to the cluster are listed.
 Usage:
-$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
+$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|datasource_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
 
 Optional Args : -cluster <<cluster name>>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 515562a..6bc5edc 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -251,7 +251,8 @@ public class JMSMessageProducer {
     private String[] getFeedPaths() throws Exception {
         WorkflowExecutionContext.EntityOperations operation = context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
-                || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {
+                || operation == WorkflowExecutionContext.EntityOperations.REPLICATE
+                || operation == WorkflowExecutionContext.EntityOperations.IMPORT) {
             LOG.debug("Returning instance paths: " + context.getOutputFeedInstancePaths());
             return context.getOutputFeedInstancePathsList();
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
new file mode 100644
index 0000000..45f46d7
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.DatasourceHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.datasource.Credentialtype;
+import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Builds Datasource import workflow for Oozie.
+ */
+
+public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
+    protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml";
+    protected static final String IMPORT_ACTION_NAME="db-import-sqoop";
+
+    private static final String ARG_SEPARATOR = " ";
+
+    public DatabaseImportWorkflowBuilder(Feed entity) { super(entity); }
+
+    @Override
+    protected WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.IMPORT;
+    }
+
+    @Override
+    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+
+        ACTION sqoopImport = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
+        // delete addHDFSServersConfig(sqoopImport, src, target);
+        addTransition(sqoopImport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(sqoopImport);
+
+        //Add post-processing actions
+        ACTION success = getSuccessPostProcessAction();
+        // delete addHDFSServersConfig(success, src, target);
+        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = getFailPostProcessAction();
+        // delete addHDFSServersConfig(fail, src, target);
+        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+
+        // build the sqoop command and put it in the properties
+        String sqoopCmd = buildSqoopCommand(cluster, entity);
+        LOG.info("SQOOP COMMAND : " + sqoopCmd);
+        Properties props = new Properties();
+        props.put("sqoopCommand", sqoopCmd);
+        return props;
+    }
+
+    private String buildSqoopCommand(Cluster cluster, Feed feed) throws FalconException {
+        Map<String, String> extraArgs = getArguments(cluster);
+        StringBuilder sqoopCmd = new StringBuilder();
+        sqoopCmd.append("import").append(ARG_SEPARATOR);
+        buildDriverArgs(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildConnectArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildTableArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildUserPasswordArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        buildNumMappers(sqoopCmd, extraArgs).append(ARG_SEPARATOR);
+        buildArguments(sqoopCmd, extraArgs).append(ARG_SEPARATOR);
+        buildTargetDirArg(sqoopCmd, cluster).append(ARG_SEPARATOR);
+        return sqoopCmd.toString();
+    }
+
+    private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        Datasource db = DatasourceHelper.getDatasource(feedCluster);
+        if ((db.getDriver() != null) && (db.getDriver().getClazz() != null)) {
+            builder.append("--driver").append(ARG_SEPARATOR).append(db.getDriver().getClazz());
+        }
+        return builder;
+    }
+
+    private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return builder.append("--connect").append(ARG_SEPARATOR)
+                .append(DatasourceHelper.getReadOnlyEndpoint(DatasourceHelper.getDatasource(feedCluster)));
+    }
+
+    private StringBuilder buildTableArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return builder.append("--table").append(ARG_SEPARATOR)
+                                    .append(FeedHelper.getImportDataSourceTableName(feedCluster));
+    }
+
+    private StringBuilder buildUserPasswordArg(StringBuilder builder, Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        Datasource db = DatasourceHelper.getDatasource(feedCluster);
+        Pair<String, String> userPasswdInfo = DatasourceHelper.getReadPasswordInfo(db);
+        builder.append("--username").append(ARG_SEPARATOR)
+                .append(userPasswdInfo.first)
+                .append(ARG_SEPARATOR);
+        if (DatasourceHelper.getReadPasswordType(db) == Credentialtype.PASSWORD_FILE) {
+            builder.append("--password-file");
+        } else {
+            builder.append("--password");
+        }
+        builder.append(ARG_SEPARATOR).append(userPasswdInfo.second);
+        return builder;
+    }
+
+    private StringBuilder buildTargetDirArg(StringBuilder builder, Cluster cluster)
+        throws FalconException {
+        return builder.append("--delete-target-dir").append(ARG_SEPARATOR)
+                .append("--target-dir").append(ARG_SEPARATOR)
+                .append(String.format("${coord:dataOut('%s')}",
+                        FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+    }
+
+    private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
+        throws FalconException {
+        for(Map.Entry<String, String> e : extraArgs.entrySet()) {
+            builder.append(e.getKey()).append(ARG_SEPARATOR).append(e.getValue()).append(ARG_SEPARATOR);
+        }
+        return builder;
+    }
+
+    /**
+     *
+     * Feed validation checks to make sure --split-by column is supplied when --num-mappers > 1
+     * if --num-mappers is not specified, set it to 1.
+     *
+     * @param builder contains command
+     * @param extraArgs map of extra arguments
+     * @return command string
+     */
+
+    private StringBuilder buildNumMappers(StringBuilder builder, Map<String, String> extraArgs) {
+        if (!extraArgs.containsKey("--num-mappers")) {
+            builder.append("--num-mappers").append(ARG_SEPARATOR).append(1);
+        }
+        return builder;
+    }
+
+    private Map<String, String> getArguments(Cluster cluster) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        return FeedHelper.getImportArguments(feedCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
new file mode 100644
index 0000000..70289d0
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/FeedImportCoordinatorBuilder.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds Oozie coordinator for database import.
+ */
+
+public class FeedImportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+    public FeedImportCoordinatorBuilder(Feed entity) {
+        super(entity, LifeCycle.IMPORT);
+    }
+
+    public static final String IMPORT_DATASET_NAME = "import-dataset";
+
+    public static final String IMPORT_DATAOUT_NAME = "import-output";
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedImportCoordinatorBuilder.class);
+
+
+    @Override
+    public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster((Feed) entity, cluster.getName());
+        if (!FeedHelper.isImportEnabled(feedCluster)) {
+            return null;
+        }
+
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed IMPORT is not applicable as Feed's end time for cluster {} is not in the future",
+                    cluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        initializeCoordAttributes(coord, (Feed) entity, cluster);
+        Properties props = createCoordDefaultConfiguration(getEntityName());
+        initializeOutputPath(coord, cluster, props);
+
+        props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+
+        WORKFLOW workflow = new WORKFLOW();
+        Path coordPath = getBuildPath(buildPath);
+        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.IMPORT).build(cluster,
+                coordPath);
+        workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        props.putAll(wfProp);
+        workflow.setConfiguration(getConfig(props));
+        ACTION action = new ACTION();
+        action.setWorkflow(workflow);
+
+        coord.setAction(action);
+
+        Path marshalPath = marshal(cluster, coord, coordPath);
+        return Arrays.asList(getProperties(marshalPath, getEntityName()));
+    }
+
+    private void initializeOutputPath(COORDINATORAPP coord, Cluster cluster, Properties props)
+        throws FalconException {
+
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
+
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
+
+        Storage storage = FeedHelper.createStorage(cluster, (Feed) entity);
+        SYNCDATASET syncdataset = createDataSet((Feed) entity, cluster, storage,
+                IMPORT_DATASET_NAME, LocationType.DATA);
+
+        if (syncdataset == null) {
+            return;
+        }
+        coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+        DATAOUT dataout = createDataOut(entity);
+        coord.getOutputEvents().getDataOut().add(dataout);
+    }
+
+    private DATAOUT createDataOut(Feed feed) {
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(IMPORT_DATAOUT_NAME);
+        dataout.setDataset(IMPORT_DATASET_NAME);
+        dataout.setInstance("${coord:current(0)}");
+        return dataout;
+    }
+
+    /**
+     * Create DataSet. The start instance is set to current date if the merge type is snapshot.
+     * Otherwise, the Feed cluster start data will be used as start instance.
+     *
+     * @param feed
+     * @param cluster
+     * @param storage
+     * @param datasetName
+     * @param locationType
+     * @return
+     * @throws FalconException
+     */
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+                                      String datasetName, LocationType locationType) throws FalconException {
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (StringUtils.isBlank(uriTemplate)) {
+            return null;
+        }
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date initialInstance = FeedHelper.getImportInitalInstance(feedCluster);
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(initialInstance));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (StringUtils.isNotBlank(feed.getAvailabilityFlag())) {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+        } else {
+            syncdataset.setDoneFlag("");
+        }
+
+        return syncdataset;
+    }
+
+    /**
+     * Initialize the coordinator with current data as start if the merge type is snapshot.
+     * Otherwise, use the feed cluster validate as the coordinator start date.
+     *
+     * @param coord
+     * @param feed
+     * @param cluster
+     */
+
+    private void initializeCoordAttributes(COORDINATORAPP coord, Feed feed, Cluster cluster) {
+        coord.setName(getEntityName());
+        // for feeds with snapshot layout, the start date will be the time of scheduling since it dumps whole table
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        Date initialInstance = FeedHelper.getImportInitalInstance(feedCluster);
+        coord.setStart(SchemaHelper.formatDateUTC(initialInstance));
+        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        coord.setTimezone(entity.getTimezone().getID());
+        coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
new file mode 100644
index 0000000..4892ecb
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Properties;
+
+/**
+ * Builds oozie workflow for Datasource import.
+ */
+
+public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+
+    public ImportWorkflowBuilder(Feed feed) {
+        super(feed, LifeCycle.IMPORT);
+    }
+
+    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.IMPORT, entity).toString();
+        workflow.setName(wfName);
+        Properties p = getWorkflow(cluster, workflow);
+        marshal(cluster, workflow, buildPath);
+
+        Properties props = FeedHelper.getFeedProperties(entity);
+        if (props == null) {
+            props = new Properties();
+        }
+        props.putAll(getProperties(buildPath, wfName));
+        if (createDefaultConfiguration(cluster) != null) {
+            props.putAll(createDefaultConfiguration(cluster));
+        }
+        if (FeedHelper.getUserWorkflowProperties(getLifecycle()) != null) {
+            props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+        }
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
+                String.format("${coord:dataOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE);
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE);
+        props.setProperty("srcClusterName", "NA");
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+
+        if (StringUtils.isEmpty(FeedHelper.getImportDatasourceName(
+            FeedHelper.getCluster(entity, cluster.getName())))) {
+            throw new FalconException("Datasource name is null or empty");
+        }
+
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(),
+            FeedHelper.getImportDatasourceName(FeedHelper.getCluster(entity, cluster.getName())));
+        props.putAll(p);
+        return props;
+    }
+
+    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index a04ae95..55e07be 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -75,6 +75,9 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
             case REPLICATION:
                 return new FeedReplicationCoordinatorBuilder((Feed)entity);
 
+            case IMPORT:
+                return new FeedImportCoordinatorBuilder((Feed)entity);
+
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + tag);
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 3213a70..026f79f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.HiveUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.DatasourceType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -131,6 +132,17 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
                     return new FSReplicationWorkflowBuilder(feed);
                 }
 
+            case IMPORT:
+                DatasourceType dsType = EntityUtil.getImportDatasourceType(cluster, feed);
+                if ((dsType == DatasourceType.MYSQL)
+                    || (dsType == DatasourceType.ORACLE)
+                    || (dsType == DatasourceType.HSQL)) {
+                    return new DatabaseImportWorkflowBuilder(feed);
+                } else {
+                    LOG.info("Import policy not implemented for DataSourceType : " + dsType);
+                }
+                break;
+
             default:
                 throw new IllegalArgumentException("Unhandled type " + entity.getEntityType()
                        + ", lifecycle " + lifecycle);

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 0dc09ee..e45dfc5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 
 import java.util.Arrays;
 import java.util.Properties;
@@ -82,7 +83,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         } else {
             props.put("availabilityFlag", entity.getAvailabilityFlag());
         }
-
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 9e55edf..9fbc5b2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -72,6 +72,11 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
             props.addAll(replicationProps);
         }
 
+        List<Properties> importProps = OozieCoordinatorBuilder.get(entity, Tag.IMPORT).buildCoords(cluster, buildPath);
+        if (importProps != null) {
+            props.addAll(importProps);
+        }
+
         if (!props.isEmpty()) {
             copySharedLibs(cluster, new Path(getLibPath(buildPath)));
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index f953557..b9e3848 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -110,6 +110,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
 
         props.put("falconInputFeeds", entity.getName());
         props.put("falconInPaths", IGNORE);
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 6e2a631..629485d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 
 import javax.xml.bind.JAXBElement;
 import java.util.Arrays;
@@ -153,7 +154,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
     protected Properties getWorkflowProperties(Feed feed) throws FalconException {
         Properties props = super.getWorkflowProperties(feed);
         props.put("availabilityFlag", "NA");
-
+        props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 3f49adb..dc9349f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -40,6 +40,7 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -129,6 +130,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         Properties props = new Properties();
         props.setProperty("srcClusterName", "NA");
         props.setProperty("availabilityFlag", "NA");
+        props.setProperty(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA");
         return props;
     }