You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/11 10:20:03 UTC

[3/5] falcon git commit: FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat Ranganathan.

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/resources/log4j.xml b/addons/hivedr/src/main/resources/log4j.xml
new file mode 100644
index 0000000..f83a9a9
--- /dev/null
+++ b/addons/hivedr/src/main/resources/log4j.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!--
+    This is used for falcon packaging only.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <logger name="org.apache.falcon" additivity="false">
+        <level value="debug"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <logger name="org.apache.hadoop" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <logger name="org.apache.hadoop.hive" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="console"/>
+    </logger>
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="console"/>
+    </root>
+
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
new file mode 100644
index 0000000..bfeca8d
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DBReplicationStatus;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Unit tests for DBReplicationStatus.
+ */
+@Test
+public class DBReplicationStatusTest {
+
+    private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>();
+    private ReplicationStatus dbReplicationStatus;
+    private ReplicationStatus tableStatus1;
+
+    public DBReplicationStatusTest() {
+    }
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        dbReplicationStatus = new ReplicationStatus("source", "target", "jobname",
+                "Default1", null, ReplicationStatus.Status.FAILURE, 20L);
+        tableStatus1 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+        tableStatuses.put("Table1", tableStatus1);
+
+    }
+
+    public void dBReplicationStatusSerializeTest() throws Exception {
+        DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+
+        String expected = "{\n" + "    \"db_status\": {\n"
+                + "        \"sourceUri\": \"source\",\n" + "        \"targetUri\": \"target\",\n"
+                + "        \"jobName\": \"jobname\",\n" + "        \"database\": \"default1\",\n"
+                + "        \"status\": \"FAILURE\",\n" + "        \"eventId\": 20\n" + "    },\n"
+                + "    \"table_status\": {\"table1\": {\n" + "        \"sourceUri\": \"source\",\n"
+                + "        \"targetUri\": \"target\",\n" + "        \"jobName\": \"jobname\",\n"
+                + "        \"database\": \"default1\",\n" + "        \"table\": \"table1\",\n"
+                + "        \"status\": \"SUCCESS\",\n" + "        \"eventId\": 20\n" + "    }}\n" + "}";
+        String actual = replicationStatus.toJsonString();
+        Assert.assertEquals(actual, expected);
+    }
+
+    public void dBReplicationStatusDeserializeTest() throws Exception {
+
+        String jsonString = "{\"db_status\":{\"sourceUri\":\"source\","
+                + "\"targetUri\":\"target\",\"jobName\":\"jobname\",\"database\":\"default1\",\"status\":\"SUCCESS\","
+                + "\"eventId\":20},\"table_status\":{\"Table1\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+                + "\"jobName\":\"jobname\",\"database\":\"default1\",\"table\":\"Table1\",\"status\":\"SUCCESS\","
+                + "\"eventId\":20},\"table3\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+                + "\"jobName\":\"jobname\", \"database\":\"Default1\",\"table\":\"table3\",\"status\":\"FAILURE\","
+                + "\"eventId\":10}, \"table2\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+                + "\"jobName\":\"jobname\", \"database\":\"default1\",\"table\":\"table2\",\"status\":\"INIT\"}}}";
+
+        DBReplicationStatus dbStatus = new DBReplicationStatus(jsonString);
+        Assert.assertEquals(dbStatus.getDatabaseStatus().getDatabase(), "default1");
+        Assert.assertEquals(dbStatus.getDatabaseStatus().getJobName(), "jobname");
+        Assert.assertEquals(dbStatus.getDatabaseStatus().getEventId(), 20);
+
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getEventId(), 20);
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getStatus(), ReplicationStatus.Status.SUCCESS);
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getEventId(), -1);
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getStatus(), ReplicationStatus.Status.INIT);
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getEventId(), 10);
+        Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getStatus(), ReplicationStatus.Status.FAILURE);
+
+
+    }
+
+    public void wrongDBForTableTest() throws Exception {
+
+        ReplicationStatus newDbStatus = new ReplicationStatus("source", "target", "jobname",
+                "wrongDb", null, ReplicationStatus.Status.FAILURE, 20L);
+        new DBReplicationStatus(newDbStatus);
+
+        try {
+            new DBReplicationStatus(newDbStatus, tableStatuses);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Cannot set status for table default1.table1, It does not belong to DB wrongdb");
+        }
+
+        String jsonString = "{\n" + "    \"db_status\": {\n"
+                + "        \"sourceUri\": \"source\",\n" + "        \"targetUri\": \"target\",\n"
+                + "        \"jobName\": \"jobname\",\n" + "        \"database\": \"wrongdb\",\n"
+                + "        \"status\": \"FAILURE\",\n" + "        \"eventId\": 20\n" + "    },\n"
+                + "    \"table_status\": {\"table1\": {\n" + "        \"sourceUri\": \"source\",\n"
+                + "        \"targetUri\": \"target\",\n" + "        \"jobName\": \"jobname\",\n"
+                + "        \"database\": \"default1\",\n" + "        \"table\": \"table1\",\n"
+                + "        \"status\": \"SUCCESS\",\n" + "        \"eventId\": 20\n" + "    }}\n" + "}";
+
+        try {
+            new DBReplicationStatus(jsonString);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Unable to create DBReplicationStatus from JsonString. Cannot set status for "
+                            + "table default1.table1, It does not belong to DB wrongdb");
+        }
+    }
+
+    public void updateTableStatusTest() throws Exception {
+        DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+        replicationStatus.updateTableStatus(tableStatus1);
+
+        // wrong DB test
+        try {
+            replicationStatus.updateTableStatus(new ReplicationStatus("source", "target", "jobname",
+                    "wrongDB", "table2", ReplicationStatus.Status.INIT, -1L));
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Cannot update Table Status. TableDB wrongdb does not match current DB default1");
+        }
+
+        // wrong status test
+        try {
+            replicationStatus.updateTableStatus(dbReplicationStatus);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Cannot update Table Status. Table name is empty.");
+        }
+
+    }
+
+    public void updateDBStatusTest() throws Exception {
+        DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+        replicationStatus.updateDbStatus(dbReplicationStatus);
+
+        // wrong DB test
+        try {
+            replicationStatus.updateDbStatus(new ReplicationStatus("source", "target", "jobname",
+                    "wrongDB", null, ReplicationStatus.Status.INIT, -1L));
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Cannot update Database Status. StatusDB wrongdb does not match current DB default1");
+        }
+
+        // wrong status test
+        try {
+            replicationStatus.updateDbStatus(tableStatus1);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Cannot update DB Status. This is table level status.");
+        }
+    }
+
+    public void updateDbStatusFromTableStatusesTest() throws Exception {
+
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname",
+                "default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname",
+                "Default1", "table2", ReplicationStatus.Status.INIT, -1L);
+        ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "Table3", ReplicationStatus.Status.FAILURE, 15L);
+        ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname",
+                "Default1", "Table4", ReplicationStatus.Status.FAILURE, 18L);
+        Map<String, ReplicationStatus> tables = new HashMap<String, ReplicationStatus>();
+
+        tables.put("table1", table1);
+        tables.put("table2", table2);
+        tables.put("table3", table3);
+        tables.put("table4", table4);
+
+        // If there is a failue, last eventId should be lowest eventId of failed tables
+        DBReplicationStatus status = new DBReplicationStatus(dbStatus, tables);
+        Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+        Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+        status.updateDbStatusFromTableStatuses();
+        Assert.assertEquals(status.getDatabaseStatus().getEventId(), 15);
+        Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.FAILURE);
+
+        // If all tables succeed, last eventId should be highest eventId of success tables
+        table3 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "table3", ReplicationStatus.Status.SUCCESS, 25L);
+        table4 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "table4", ReplicationStatus.Status.SUCCESS, 22L);
+        tables.put("Table3", table3);
+        tables.put("Table4", table4);
+        status = new DBReplicationStatus(dbStatus, tables);
+        status.updateDbStatusFromTableStatuses();
+        Assert.assertEquals(status.getDatabaseStatus().getEventId(), 25);
+        Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+
+        // Init tables should not change DB status.
+        Map<String, ReplicationStatus> initOnlyTables = new HashMap<String, ReplicationStatus>();
+        initOnlyTables.put("table2", table2);
+        dbStatus = new ReplicationStatus("source", "target", "jobname",
+                "default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+        status = new DBReplicationStatus(dbStatus, initOnlyTables);
+        Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+        Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+        status.updateDbStatusFromTableStatuses();
+        Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+        Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
new file mode 100644
index 0000000..1f44b62
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+/**
+ * Test class for DR.
+ */
+public class DRTest {
+    public void testHiveDr(String[] args) {
+        String[] testArgs = {
+            "-sourceMetastoreUri", "thrift://localhost:9083",
+            "-sourceDatabase", "default",
+            "-sourceTable", "test",
+            "-sourceStagingPath", "/apps/hive/tools/dr",
+            "-sourceNN", "hdfs://localhost:8020",
+            "-sourceRM", "local",
+
+            "-targetMetastoreUri", "thrift://localhost:9083",
+            "-targetStagingPath", "/apps/hive/tools/dr",
+            "-targetNN", "hdfs://localhost:8020",
+            "-targetRM", "local",
+
+            "-maxEvents", "5",
+            "-replicationMaxMaps", "1",
+            "-distcpMapBandwidth", "4",
+        };
+        HiveDRTool.main(testArgs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
new file mode 100644
index 0000000..c89c661
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.hadoop.JailedFileSystem;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Unit tests for HiveDRStatusStore.
+ */
+@Test
+public class HiveDRStatusStoreTest {
+    private HiveDRStatusStore drStatusStore;
+    private FileSystem fileSystem = new JailedFileSystem();
+
+    public HiveDRStatusStoreTest() throws Exception {
+        EmbeddedCluster cluster =  EmbeddedCluster.newCluster("hiveReplTest");
+        Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH);
+
+        fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf());
+        if (fileSystem.exists(storePath)) {
+            fileSystem.delete(storePath, true);
+        }
+        FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION);
+        try {
+            new HiveDRStatusStore(fileSystem);
+            Assert.fail();
+        } catch (IOException ie) {
+            // Exception expected.
+            Assert.assertEquals(ie.getMessage(), "Base dir jail://hiveReplTest:00" + storePath.toUri()
+                    + " does not have correct ownership/permissions."
+                    + " Please set group to " + DRStatusStore.getStoreGroup() + " and permissions to rwxrwx---");
+        }
+        drStatusStore = new HiveDRStatusStore(fileSystem, fileSystem.getFileStatus(storePath).getGroup());
+    }
+
+    @BeforeClass
+    public  void updateReplicationStatusTest() throws Exception {
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname",
+                "Default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname",
+                "Default1", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "Table2", ReplicationStatus.Status.INIT, -1L);
+        ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname",
+                "Default1", "Table3", ReplicationStatus.Status.FAILURE, 15L);
+        ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname",
+                "default1", "table4", ReplicationStatus.Status.FAILURE, 18L);
+        ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(table1);
+        replicationStatusList.add(table2);
+        replicationStatusList.add(table3);
+        replicationStatusList.add(table4);
+        replicationStatusList.add(dbStatus);
+        drStatusStore.updateReplicationStatus("jobname", replicationStatusList);
+    }
+
+    public  void updateReplicationStatusNewTablesTest() throws Exception {
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname2",
+                "default2", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname2",
+                "Default2", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname2",
+                "default2", "Table2", ReplicationStatus.Status.INIT, -1L);
+        ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname2",
+                "default2", "table3", ReplicationStatus.Status.FAILURE, 15L);
+        ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname2",
+                "Default2", "Table4", ReplicationStatus.Status.FAILURE, 18L);
+        ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(table1);
+        replicationStatusList.add(table2);
+        replicationStatusList.add(table3);
+        replicationStatusList.add(table4);
+        replicationStatusList.add(dbStatus);
+
+        drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+        ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2");
+        Assert.assertEquals(status.getEventId(), 15);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+        Assert.assertEquals(status.getJobName(), "jobname2");
+        Assert.assertEquals(status.getTable(), null);
+        Assert.assertEquals(status.getSourceUri(), "source");
+
+        Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+                "jobname2", "default2");
+        int size = 0;
+        while(iter.hasNext()) {
+            iter.next();
+            size++;
+        }
+        Assert.assertEquals(4, size);
+
+        table3 = new ReplicationStatus("source", "target", "jobname2",
+                "default2", "table3", ReplicationStatus.Status.SUCCESS, 25L);
+        table4 = new ReplicationStatus("source", "target", "jobname2",
+                "Default2", "table4", ReplicationStatus.Status.SUCCESS, 22L);
+        ReplicationStatus table5 = new ReplicationStatus("source", "target", "jobname2",
+                "default2", "Table5", ReplicationStatus.Status.SUCCESS, 18L);
+        ReplicationStatus db1table1 = new ReplicationStatus("source", "target", "jobname2",
+                "Default1", "Table1", ReplicationStatus.Status.SUCCESS, 18L);
+        replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(table5);
+        replicationStatusList.add(table3);
+        replicationStatusList.add(table4);
+        replicationStatusList.add(db1table1);
+
+        drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+        status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default1");
+        Assert.assertEquals(status.getEventId(), 18);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+
+        status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2");
+        Assert.assertEquals(status.getEventId(), 25);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+
+        iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+                "jobname2", "default2");
+        size = 0;
+        while(iter.hasNext()) {
+            iter.next();
+            size++;
+        }
+        Assert.assertEquals(5, size);
+    }
+
+    public void getReplicationStatusDBTest() throws HiveReplicationException {
+        ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname", "Default1");
+        Assert.assertEquals(status.getEventId(), 15);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+        Assert.assertEquals(status.getJobName(), "jobname");
+        Assert.assertEquals(status.getTable(), null);
+        Assert.assertEquals(status.getSourceUri(), "source");
+    }
+
+    public void checkReplicationConflictTest() throws HiveReplicationException {
+
+        try {
+            //same source, same job, same DB, null table : pass
+            drStatusStore.checkForReplicationConflict("source", "jobname", "default1", null);
+
+            //same source, same job, same DB, same table : pass
+            drStatusStore.checkForReplicationConflict("source", "jobname", "default1", "table1");
+
+            //same source, same job, different DB, null table : pass
+            drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", null);
+
+            //same source, same job, different DB, different table : pass
+            drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", "diffTable");
+
+            // same source, different job, same DB, diff table : pass
+            drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "diffTable");
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+
+        try {
+            // different source, same job, same DB, null table : fail
+            drStatusStore.checkForReplicationConflict("diffSource", "jobname", "default1", null);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Two different sources are attempting to replicate to same db default1."
+                            + " New Source = diffSource, Existing Source = source");
+        }
+
+        try {
+            // same source, different job, same DB, null table : fail
+            drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", null);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Two different jobs are attempting to replicate to same db default1."
+                            + " New Job = diffJob, Existing Job = jobname");
+        }
+
+        try {
+            // same source, different job, same DB, same table : fail
+            drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "table1");
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Two different jobs are trying to replicate to same table table1."
+                            + " New job = diffJob, Existing job = jobname");
+        }
+
+
+    }
+
+    public void deleteReplicationStatusTest() throws Exception {
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "deleteJob",
+                "deleteDB", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table1 = new ReplicationStatus("source", "target", "deleteJob",
+                "deleteDB", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+        ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(table1);
+        replicationStatusList.add(dbStatus);
+        drStatusStore.updateReplicationStatus("deleteJob", replicationStatusList);
+
+        ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "deleteJob", "deleteDB");
+        Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName());
+        Assert.assertEquals(fileSystem.exists(statusPath), true);
+
+        drStatusStore.deleteReplicationStatus("deleteJob", "deleteDB");
+        Assert.assertEquals(fileSystem.exists(statusPath), false);
+    }
+
+    public void  getReplicationStatusTableTest() throws HiveReplicationException {
+        ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target",
+                "jobname", "default1", "table1");
+        Assert.assertEquals(status.getEventId(), 20);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+        Assert.assertEquals(status.getTable(), "table1");
+
+        status = drStatusStore.getReplicationStatus("source", "target",
+                "jobname", "Default1", "Table2");
+        Assert.assertEquals(status.getEventId(), -1);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT);
+        Assert.assertEquals(status.getTable(), "table2");
+
+        status = drStatusStore.getReplicationStatus("source", "target",
+                "jobname", "default1", "Table3");
+        Assert.assertEquals(status.getEventId(), 15);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+        Assert.assertEquals(status.getTable(), "table3");
+
+        status = drStatusStore.getReplicationStatus("source", "target",
+                "jobname", "default1", "table4");
+        Assert.assertEquals(status.getEventId(), 18);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+        Assert.assertEquals(status.getTable(), "table4");
+    }
+
+    public void getTableReplicationStatusesInDbTest() throws HiveReplicationException {
+        Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+                "jobname", "Default1");
+        int size = 0;
+        while(iter.hasNext()) {
+            size++;
+            ReplicationStatus status = iter.next();
+            if (status.getTable().equals("table3")) {
+                Assert.assertEquals(status.getEventId(), 15);
+                Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+                Assert.assertEquals(status.getTable(), "table3");
+            }
+        }
+        Assert.assertEquals(4, size);
+    }
+
+    public void fileRotationTest() throws Exception {
+        // initialize replication status store for db default3.
+        // This should init with eventId = -1 and status = INIT
+        ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target",
+                "jobname3", "default3");
+        Assert.assertEquals(status.getEventId(), -1);
+        Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT);
+
+        // update status 5 times resulting in 6 files : latest.json + five rotated files
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3",
+                "Default3", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname3",
+                "default3", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+        ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(table1);
+        replicationStatusList.add(dbStatus);
+
+        for(int i=0; i<5; i++) {
+            Thread.sleep(2000);
+            drStatusStore.updateReplicationStatus("jobname3", replicationStatusList);
+        }
+
+        status = drStatusStore.getReplicationStatus("source", "target", "jobname3", "default3");
+        Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName());
+        RemoteIterator<LocatedFileStatus> iter = fileSystem.listFiles(statusPath, false);
+        Assert.assertEquals(getRemoteIterSize(iter), 6);
+
+        drStatusStore.rotateStatusFiles(statusPath, 3, 10000000);
+        iter = fileSystem.listFiles(statusPath, false);
+        Assert.assertEquals(getRemoteIterSize(iter), 6);
+
+        drStatusStore.rotateStatusFiles(statusPath, 3, 6000);
+        iter = fileSystem.listFiles(statusPath, false);
+        Assert.assertEquals(getRemoteIterSize(iter), 3);
+    }
+
+    public void wrongJobNameTest() throws Exception {
+        ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3",
+                "Default3", null, ReplicationStatus.Status.SUCCESS, 20L);
+        ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+        replicationStatusList.add(dbStatus);
+
+        try {
+            drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            // Expected exception due to jobname mismatch
+        }
+    }
+
+    @AfterClass
+    public void cleanUp() throws IOException {
+        fileSystem.delete(new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH), true);
+    }
+
+    private int getRemoteIterSize(RemoteIterator<LocatedFileStatus> iter) throws IOException {
+        int size = 0;
+        while(iter.hasNext()) {
+            iter.next();
+            size++;
+        }
+        return size;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
new file mode 100644
index 0000000..a02639c
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for ReplicationStatus.
+ */
+@Test
+public class ReplicationStatusTest {
+
+    private ReplicationStatus dbStatus, tableStatus;
+
+    public ReplicationStatusTest() {}
+
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        dbStatus = new ReplicationStatus("source", "target", "jobname",
+                "default1", null, ReplicationStatus.Status.INIT, 0L);
+        tableStatus = new ReplicationStatus("source", "target", "jobname",
+                "testDb", "Table1", ReplicationStatus.Status.SUCCESS, 0L);
+    }
+
+    public void replicationStatusSerializeTest() throws Exception {
+        String expected = "{\n    \"sourceUri\": \"source\",\n"
+                + "    \"targetUri\": \"target\",\n    \"jobName\": \"jobname\",\n"
+                + "    \"database\": \"testdb\",\n    \"table\": \"table1\",\n"
+                + "    \"status\": \"SUCCESS\",\n    \"eventId\": 0\n}";
+        String actual = tableStatus.toJsonString();
+        Assert.assertEquals(actual, expected);
+
+        expected = "{\n    \"sourceUri\": \"source\",\n    \"targetUri\": \"target\",\n"
+                + "    \"jobName\": \"jobname\",\n    \"database\": \"default1\",\n"
+                + "    \"status\": \"INIT\",\n    \"eventId\": 0\n}";
+        actual = dbStatus.toJsonString();
+        Assert.assertEquals(actual, expected);
+    }
+
+    public void replicationStatusDeserializeTest() throws Exception {
+        String tableInput = "{\n    \"sourceUri\": \"source\",\n"
+                + "    \"targetUri\": \"target\",\n    \"jobName\": \"testJob\",\n"
+                + "    \"database\": \"Test1\",\n    \"table\": \"table1\",\n"
+                + "    \"status\": \"SUCCESS\",\n    \"eventId\": 0\n}";
+        String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n"
+                + "    \"database\": \"default1\", \"status\": \"FAILURE\","
+                + "   \"eventId\": 27, \"statusLog\": \"testLog\"}";
+
+        ReplicationStatus newDbStatus = new ReplicationStatus(dbInput);
+        ReplicationStatus newTableStatus = new ReplicationStatus(tableInput);
+
+        Assert.assertEquals(newDbStatus.getTable(), null);
+        Assert.assertEquals(newDbStatus.getEventId(), 27);
+        Assert.assertEquals(newDbStatus.getDatabase(), "default1");
+        Assert.assertEquals(newDbStatus.getLog(), "testLog");
+        Assert.assertEquals(newDbStatus.getStatus(), ReplicationStatus.Status.FAILURE);
+
+
+        Assert.assertEquals(newTableStatus.getTable(), "table1");
+        Assert.assertEquals(newTableStatus.getEventId(), 0);
+        Assert.assertEquals(newTableStatus.getDatabase(), "test1");
+        Assert.assertEquals(newTableStatus.getJobName(), "testJob");
+
+        // no table, no eventId, no log
+        dbInput = "{\n    \"sourceUri\": \"source\",\n"
+                + "    \"targetUri\": \"target\",\n    \"jobName\": \"testJob\",\n"
+                + "    \"database\": \"Test1\",\n"
+                + "    \"status\": \"SUCCESS\"\n}";
+        newDbStatus = new ReplicationStatus(dbInput);
+
+        Assert.assertEquals(newDbStatus.getDatabase(), "test1");
+        Assert.assertEquals(newDbStatus.getTable(), null);
+        Assert.assertEquals(newDbStatus.getEventId(), -1);
+        Assert.assertEquals(newDbStatus.getLog(), null);
+
+    }
+
+    public void invalidEventIdTest() throws Exception {
+        String tableInput = "{\n    \"sourceUri\": \"source\",\n"
+                + "    \"targetUri\": \"target\",\n    \"jobName\": \"testJob\",\n"
+                + "    \"database\": \"test1\",\n    \"table\": \"table1\",\n"
+                + "    \"status\": \"SUCCESS\",\n    \"eventId\": -100\n}";
+
+        ReplicationStatus newTableStatus = new ReplicationStatus(tableInput);
+        Assert.assertEquals(newTableStatus.getEventId(), -1);
+
+        newTableStatus.setEventId(-200);
+        Assert.assertEquals(newTableStatus.getEventId(), -1);
+
+        String expected = "{\n    \"sourceUri\": \"source\",\n"
+                + "    \"targetUri\": \"target\",\n    \"jobName\": \"testJob\",\n"
+                + "    \"database\": \"test1\",\n    \"table\": \"table1\",\n"
+                + "    \"status\": \"SUCCESS\",\n    \"eventId\": -1\n}";
+        String actual = newTableStatus.toJsonString();
+        Assert.assertEquals(actual, expected);
+
+        newTableStatus.setEventId(50);
+        Assert.assertEquals(newTableStatus.getEventId(), 50);
+    }
+
+    public void invalidStatusTest() throws Exception {
+
+        String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n"
+                + "    \"database\": \"default1\", \"status\": \"BLAH\","
+                + "   \"eventId\": 27, \"statusLog\": \"testLog\"}";
+
+        try {
+            new ReplicationStatus(dbInput);
+            Assert.fail();
+        } catch (HiveReplicationException e) {
+            Assert.assertEquals(e.getMessage(),
+                    "Unable to deserialize jsonString to ReplicationStatus. Invalid status BLAH");
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
index 824e6f5..de0f748 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
@@ -17,30 +17,27 @@
   limitations under the License.
   -->
 
-<process name="##name##" xmlns="uri:falcon:process:0.1">
+<process name="##falcon.recipe.job.name##" xmlns="uri:falcon:process:0.1">
     <clusters>
         <!--  source  -->
-        <cluster name="##src.cluster.name##">
-            <validity end="##src.cluster.validity.end##" start="##src.cluster.validity.start##"/>
+        <cluster name="##falcon.recipe.cluster.name##">
+            <validity end="##falcon.recipe.cluster.validity.end##" start="##falcon.recipe.cluster.validity.start##"/>
         </cluster>
     </clusters>
 
+    <tags>_falcon_mirroring_type=HDFS</tags>
+
     <parallel>1</parallel>
     <!-- Dir replication needs to run only once to catch up -->
     <order>LAST_ONLY</order>
-    <frequency>##process.frequency##</frequency>
+    <frequency>##falcon.recipe.frequency##</frequency>
     <timezone>UTC</timezone>
 
     <properties>
         <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
-        <property name="##process.property2.name##" value="##process.property2.value##"/>
-        <property name="##process.property3.name##" value="##process.property3.value##"/>
-        <property name="##process.property4.name##" value="##process.property4.value##"/>
-        <property name="##process.property5.name##" value="##process.property5.value##"/>
-        <property name="##process.property6.name##" value="##process.property6.value##"/>
-        <property name="##process.property7.name##" value="##process.property7.value##"/>
     </properties>
 
-    <workflow name="##workflow.name##" engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
-    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
-</process>
+    <workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml" lib="##workflow.lib.path##"/>
+    <retry policy="##falcon.recipe.retry.policy##" delay="##falcon.recipe.retry.delay##" attempts="3"/>
+    <ACL/>
+</process>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
index 145d489..d6a4ee9 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
@@ -47,28 +47,78 @@
                     <name>oozie.launcher.oozie.libpath</name>
                     <value>${wf:conf("falcon.libpath")}</value>
                 </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${drSourceClusterFS},${drTargetClusterFS}</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
             <arg>-maxMaps</arg>
-            <arg>${maxMaps}</arg>
+            <arg>${distcpMaxMaps}</arg>
             <arg>-mapBandwidth</arg>
-            <arg>${mapBandwidth}</arg>
+            <arg>${distcpMapBandwidth}</arg>
             <arg>-sourcePaths</arg>
-            <arg>${nameNode}${drSourceDir}</arg>
+            <arg>${drSourceDir}</arg>
             <arg>-targetPath</arg>
             <arg>${drTargetClusterFS}${drTargetDir}</arg>
             <arg>-falconFeedStorageType</arg>
             <arg>FILESYSTEM</arg>
+            <arg>-availabilityFlag</arg>
+            <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
+            <arg>-counterLogDir</arg>
+            <arg>${logDir}/job-${nominalTime}</arg>
         </java>
+        <ok to="success"/>
+        <error to="failure"/>
+    </action>
+    <decision name="success">
+        <switch>
+            <case to="successAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="end"/>
+        </switch>
+    </decision>
+    <decision name="failure">
+        <switch>
+            <case to="failureAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="fail"/>
+        </switch>
+    </decision>
+    <action name="successAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>INFO: HDFS DR workflow ${entityName} completed successfully</subject>
+            <body>
+                The HDFS DR workflow ${wf:id()} is successful.
+                Source      =   ${drSourceDir}
+                Target      =   ${drTargetClusterFS}${drTargetDir}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="end"/>
+    </action>
+    <action name="failureAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>ERROR: HDFS DR workflow ${entityName} failed</subject>
+            <body>
+                The workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+                Source      =   ${drSourceDir}
+                Target      =   ${drTargetClusterFS}${drTargetDir}
+            </body>
+        </email>
         <ok to="end"/>
         <error to="fail"/>
     </action>
     <kill name="fail">
         <message>
-            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+            Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
         </message>
     </kill>
-    <end name='end'/>
+    <end name="end"/>
 </workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
index 19b8459..64ab6b8 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
@@ -19,47 +19,59 @@
 ##### NOTE: This is a TEMPLATE file which can be copied and edited
 
 ##### Recipe properties
-falcon.recipe.name=hdfs-replication
-
+##### Unique recipe job name
+falcon.recipe.name=sales-monthly
 
 ##### Workflow properties
-
 falcon.recipe.workflow.name=hdfs-dr-workflow
 # Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
-falcon.recipe.workflow.path=/recipes/hdfs-replication/hdfs-replication-workflow.xml
+falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml
 # Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS
 #falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib
 
 ##### Cluster properties
+# Cluster where job should run
+falcon.recipe.cluster.name=primaryCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2015-03-13T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
 
-# Change the src cluster name here
-falcon.recipe.src.cluster.name=test
-# Change the src cluster hdfs write end point here. This is mandatory.
-falcon.recipe.src.cluster.hdfs.writeEndPoint=hdfs://sandbox.hortonworks.com:8020
-# Change the src cluster validity start time here
-falcon.recipe.src.cluster.validity.start=2014-10-01T00:00Z
-# Change the src cluster validity end time here
-falcon.recipe.src.cluster.validity.end=2016-12-30T00:00Z
+##### Scheduling properties
+# Change the recipe frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(5)
 
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=
 
-##### Scheduling properties
+##### Retry policy properties
 
-# Change the process here. Valid frequency type are minutes, hours, days, months
-falcon.recipe.process.frequency=minutes(60)
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
 
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+falcon.recipe.acl.owner=ambari-qa
+falcon.recipe.acl.group=users
+falcon.recipe.acl.permission=0x755
+falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
 
 ##### Custom Job properties
 
-# Specify property names and values for properties defined in recipe template
-falcon.recipe.process.property2.name=drSourceDir
-falcon.recipe.process.property2.value=/falcon/test/srcCluster/input
-falcon.recipe.process.property3.name=drTargetClusterFS
-falcon.recipe.process.property3.value=hdfs://sandbox.hortonworks.com:8020
-falcon.recipe.process.property4.name=drTargetDir
-falcon.recipe.process.property4.value=/falcon/test/targetCluster/input
-falcon.recipe.process.property5.name=drTargetCluster
-falcon.recipe.process.property5.value=backupCluster
-falcon.recipe.process.property6.name=maxMaps
-falcon.recipe.process.property6.value=5
-falcon.recipe.process.property7.name=mapBandwidth
-falcon.recipe.process.property7.value=100
+# Specify multiple comma separated source directories
+drSourceDir=/user/hrt_qa/dr/test/primaryCluster/input
+drSourceClusterFS=hdfs://240.0.0.10:8020
+drTargetDir=/user/hrt_qa/dr/test/backupCluster/input
+drTargetClusterFS=hdfs://240.0.0.11:8020
+
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/README.txt
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/README.txt b/addons/recipes/hive-disaster-recovery/README.txt
new file mode 100644
index 0000000..ab393b1
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/README.txt
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Hive Metastore Disaster Recovery Recipe
+
+Overview
+This recipe implements replicating hive metadata and data from one
+Hadoop cluster to another Hadoop cluster.
+This piggy backs on replication solution in Falcon which uses the DistCp tool.
+
+Use Case
+*
+*
+
+Limitations
+*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Hive Metastore Disaster Recovery Recipe
+
+Overview
+This recipe implements replicating hive metadata and data from one
+Hadoop cluster to another Hadoop cluster.
+This piggy backs on replication solution in Falcon which uses the DistCp tool.
+
+Use Case
+*
+*
+
+Limitations
+*

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/pom.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/pom.xml b/addons/recipes/hive-disaster-recovery/pom.xml
new file mode 100644
index 0000000..1732907
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.falcon.recipes</groupId>
+    <artifactId>falcon-hive-replication-recipe</artifactId>
+    <version>0.7-SNAPSHOT</version>
+    <description>Apache Falcon Hive Disaster Recovery Recipe</description>
+    <name>Apache Falcon Sample Hive Disaster Recovery Recipe</name>
+    <packaging>jar</packaging>
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
new file mode 100644
index 0000000..3afbef0
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##name##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##cluster.name##">
+            <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/>
+        </cluster>
+    </clusters>
+
+    <tags>_falcon_mirroring_type=HIVE</tags>
+
+    <parallel>1</parallel>
+    <!-- Replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##process.frequency##</frequency>
+    <timezone>UTC</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+    </properties>
+
+    <workflow name="##workflow.name##" engine="oozie"
+              path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
+    <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+    <ACL/>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
new file mode 100644
index 0000000..7362c2e
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
@@ -0,0 +1,401 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'>
+    <credentials>
+        <credential name='hive_src_credentials' type='hcat'>
+            <property>
+                <name>hcat.metastore.uri</name>
+                <value>${sourceMetastoreUri}</value>
+            </property>
+            <property>
+                <name>hcat.metastore.principal</name>
+                <value>${sourceHiveMetastoreKerberosPrincipal}</value>
+            </property>
+        </credential>
+        <credential name='hive_tgt_credentials' type='hcat'>
+            <property>
+                <name>hcat.metastore.uri</name>
+                <value>${targetMetastoreUri}</value>
+            </property>
+            <property>
+                <name>hcat.metastore.principal</name>
+                <value>${targetHiveMetastoreKerberosPrincipal}</value>
+            </property>
+        </credential>
+        <credential name="hive2_src_credentials" type="hive2">
+            <property>
+                <name>hive2.server.principal</name>
+                <value>${sourceHive2KerberosPrincipal}</value>
+            </property>
+            <property>
+                <name>hive2.jdbc.url</name>
+                <value>jdbc:${sourceHiveServer2Uri}/${sourceDatabase}</value>
+            </property>
+        </credential>
+        <credential name="hive2_tgt_credentials" type="hive2">
+            <property>
+                <name>hive2.server.principal</name>
+                <value>${targetHive2KerberosPrincipal}</value>
+            </property>
+            <property>
+                <name>hive2.jdbc.url</name>
+                <value>jdbc:${targetHiveServer2Uri}/${sourceDatabase}</value>
+            </property>
+        </credential>
+    </credentials>
+    <start to='last-event'/>
+    <action name="last-event" cred="hive_tgt_credentials">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-sourceNNKerberosPrincipal</arg>
+            <arg>${sourceNNKerberosPrincipal}</arg>
+            <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+            <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-sourceHive2KerberosPrincipal</arg>
+            <arg>${sourceHive2KerberosPrincipal}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-targetNNKerberosPrincipal</arg>
+            <arg>${targetNNKerberosPrincipal}</arg>
+            <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+            <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-targetHive2KerberosPrincipal</arg>
+            <arg>${targetHive2KerberosPrincipal}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-clusterForJobNNKerberosPrincipal</arg>
+            <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>lastevents</arg>
+        </java>
+        <ok to="export-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Export Replication action -->
+    <action name="export-dr-replication" cred="hive_src_credentials,hive2_src_credentials">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-sourceNNKerberosPrincipal</arg>
+            <arg>${sourceNNKerberosPrincipal}</arg>
+            <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+            <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-sourceHive2KerberosPrincipal</arg>
+            <arg>${sourceHive2KerberosPrincipal}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-targetNNKerberosPrincipal</arg>
+            <arg>${targetNNKerberosPrincipal}</arg>
+            <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+            <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-targetHive2KerberosPrincipal</arg>
+            <arg>${targetHive2KerberosPrincipal}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-clusterForJobNNKerberosPrincipal</arg>
+            <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>export</arg>
+        </java>
+        <ok to="import-dr-replication"/>
+        <error to="failure"/>
+    </action>
+    <!-- Import Replication action -->
+    <action name="import-dr-replication" cred="hive_tgt_credentials,hive2_tgt_credentials">
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>distcp,hive,hive2,hcatalog</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+                <property>
+                    <name>mapreduce.job.hdfs-servers</name>
+                    <value>${sourceNN},${targetNN}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+            <arg>-Dmapred.job.queue.name=${queueName}</arg>
+            <arg>-Dmapred.job.priority=${jobPriority}</arg>
+            <arg>-falconLibPath</arg>
+            <arg>${wf:conf("falcon.libpath")}</arg>
+            <arg>-replicationMaxMaps</arg>
+            <arg>${replicationMaxMaps}</arg>
+            <arg>-distcpMaxMaps</arg>
+            <arg>${distcpMaxMaps}</arg>
+            <arg>-sourceCluster</arg>
+            <arg>${sourceCluster}</arg>
+            <arg>-sourceMetastoreUri</arg>
+            <arg>${sourceMetastoreUri}</arg>
+            <arg>-sourceHiveServer2Uri</arg>
+            <arg>${sourceHiveServer2Uri}</arg>
+            <arg>-sourceDatabase</arg>
+            <arg>${sourceDatabase}</arg>
+            <arg>-sourceTable</arg>
+            <arg>${sourceTable}</arg>
+            <arg>-sourceStagingPath</arg>
+            <arg>${sourceStagingPath}</arg>
+            <arg>-sourceNN</arg>
+            <arg>${sourceNN}</arg>
+            <arg>-sourceNNKerberosPrincipal</arg>
+            <arg>${sourceNNKerberosPrincipal}</arg>
+            <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+            <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-sourceHive2KerberosPrincipal</arg>
+            <arg>${sourceHive2KerberosPrincipal}</arg>
+            <arg>-targetCluster</arg>
+            <arg>${targetCluster}</arg>
+            <arg>-targetMetastoreUri</arg>
+            <arg>${targetMetastoreUri}</arg>
+            <arg>-targetHiveServer2Uri</arg>
+            <arg>${targetHiveServer2Uri}</arg>
+            <arg>-targetStagingPath</arg>
+            <arg>${targetStagingPath}</arg>
+            <arg>-targetNN</arg>
+            <arg>${targetNN}</arg>
+            <arg>-targetNNKerberosPrincipal</arg>
+            <arg>${targetNNKerberosPrincipal}</arg>
+            <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+            <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+            <arg>-targetHive2KerberosPrincipal</arg>
+            <arg>${targetHive2KerberosPrincipal}</arg>
+            <arg>-maxEvents</arg>
+            <arg>${maxEvents}</arg>
+            <arg>-distcpMapBandwidth</arg>
+            <arg>${distcpMapBandwidth}</arg>
+            <arg>-clusterForJobRun</arg>
+            <arg>${clusterForJobRun}</arg>
+            <arg>-clusterForJobRunWriteEP</arg>
+            <arg>${clusterForJobRunWriteEP}</arg>
+            <arg>-clusterForJobNNKerberosPrincipal</arg>
+            <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-drJobName</arg>
+            <arg>${drJobName}-${nominalTime}</arg>
+            <arg>-executionStage</arg>
+            <arg>import</arg>
+        </java>
+        <ok to="success"/>
+        <error to="failure"/>
+    </action>
+    <decision name="success">
+        <switch>
+            <case to="successAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="end"/>
+        </switch>
+    </decision>
+    <decision name="failure">
+        <switch>
+            <case to="failureAlert">
+                ${drNotificationReceivers ne 'NA'}
+            </case>
+            <default to="fail"/>
+        </switch>
+    </decision>
+    <action name="successAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} is successful.
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="end"/>
+    </action>
+    <action name="failureAlert">
+        <email xmlns="uri:oozie:email-action:0.2">
+            <to>${drNotificationReceivers}</to>
+            <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
+            <body>
+                The Hive DR workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+                Source          = ${sourceCluster}
+                Target          = ${targetCluster}
+                DB Name         = ${sourceDatabase}
+                Table Name      = ${sourceTable}
+            </body>
+        </email>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
new file mode 100644
index 0000000..b2d670a
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##### NOTE: This is a TEMPLATE file which can be copied and edited
+
+##### Recipe properties
+falcon.recipe.name=hive-disaster-recovery
+
+
+##### Workflow properties
+falcon.recipe.workflow.name=hive-dr-workflow
+# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
+falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-secure-workflow.xml
+
+##### Cluster properties
+
+# Change the cluster name where replication job should run here
+falcon.recipe.cluster.name=backupCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2014-10-01T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
+# Change the cluster namenode kerberos principal. This is mandatory on secure clusters.
+falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
+
+##### Scheduling properties
+
+# Change the process frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(60)
+
+##### Retry policy properties
+
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=owner=landing,pipeline=adtech
+
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+#falcon.recipe.acl.owner=testuser
+#falcon.recipe.acl.group=group
+#falcon.recipe.acl.permission=0x755
+
+##### Custom Job properties
+
+##### Source Cluster DR properties
+sourceCluster=primaryCluster
+sourceMetastoreUri=thrift://localhost:9083
+sourceHiveServer2Uri=hive2://localhost:10000
+# For DB level replicaiton to replicate multiple databases specify comma separated list of tables
+sourceDatabase=default
+# For DB level replication specify * for sourceTable.
+# For table level replication to replicate multiple tables specify comma separated list of tables
+sourceTable=testtable_dr
+sourceStagingPath=/apps/hive/tools/dr
+sourceNN=hdfs://localhost:8020
+# Specify kerberos principal required to access source namenode and hive servers, optional on non-secure cluster.
+sourceNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM
+sourceHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM
+sourceHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM
+
+##### Target Cluster DR properties
+targetCluster=backupCluster
+targetMetastoreUri=thrift://localhost:9083
+targetHiveServer2Uri=hive2://localhost:10000
+targetStagingPath=/apps/hive/tools/dr
+targetNN=hdfs://localhost:8020
+# Specify kerberos principal required to access target namenode and hive servers, optional on non-secure cluster.
+targetNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM
+targetHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM
+targetHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM
+
+# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit.
+# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously!
+maxEvents=-1
+# Change it to specify the maximum number of mappers for replication
+replicationMaxMaps=5
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
new file mode 100644
index 0000000..3afbef0
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##name##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##cluster.name##">
+            <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/>
+        </cluster>
+    </clusters>
+
+    <tags>_falcon_mirroring_type=HIVE</tags>
+
+    <parallel>1</parallel>
+    <!-- Replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##process.frequency##</frequency>
+    <timezone>UTC</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+    </properties>
+
+    <workflow name="##workflow.name##" engine="oozie"
+              path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
+    <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+    <ACL/>
+</process>