You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/11 10:20:03 UTC
[3/5] falcon git commit: FALCON-1188 Falcon support for Hive
Replication. Contributed by Venkat Ranganathan.
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/resources/log4j.xml b/addons/hivedr/src/main/resources/log4j.xml
new file mode 100644
index 0000000..f83a9a9
--- /dev/null
+++ b/addons/hivedr/src/main/resources/log4j.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!--
+ This is used for falcon packaging only.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.falcon" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="org.apache.hadoop" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <logger name="org.apache.hadoop.hive" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="console"/>
+ </root>
+
+</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
new file mode 100644
index 0000000..bfeca8d
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DBReplicationStatus;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Unit tests for DBReplicationStatus.
+ */
+@Test
+public class DBReplicationStatusTest {
+
+ private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>();
+ private ReplicationStatus dbReplicationStatus;
+ private ReplicationStatus tableStatus1;
+
+ public DBReplicationStatusTest() {
+ }
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ dbReplicationStatus = new ReplicationStatus("source", "target", "jobname",
+ "Default1", null, ReplicationStatus.Status.FAILURE, 20L);
+ tableStatus1 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+ tableStatuses.put("Table1", tableStatus1);
+
+ }
+
+ public void dBReplicationStatusSerializeTest() throws Exception {
+ DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+
+ String expected = "{\n" + " \"db_status\": {\n"
+ + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n"
+ + " \"jobName\": \"jobname\",\n" + " \"database\": \"default1\",\n"
+ + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n"
+ + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n"
+ + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}";
+ String actual = replicationStatus.toJsonString();
+ Assert.assertEquals(actual, expected);
+ }
+
+ public void dBReplicationStatusDeserializeTest() throws Exception {
+
+ String jsonString = "{\"db_status\":{\"sourceUri\":\"source\","
+ + "\"targetUri\":\"target\",\"jobName\":\"jobname\",\"database\":\"default1\",\"status\":\"SUCCESS\","
+ + "\"eventId\":20},\"table_status\":{\"Table1\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+ + "\"jobName\":\"jobname\",\"database\":\"default1\",\"table\":\"Table1\",\"status\":\"SUCCESS\","
+ + "\"eventId\":20},\"table3\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+ + "\"jobName\":\"jobname\", \"database\":\"Default1\",\"table\":\"table3\",\"status\":\"FAILURE\","
+ + "\"eventId\":10}, \"table2\":{\"sourceUri\":\"source\",\"targetUri\":\"target\","
+ + "\"jobName\":\"jobname\", \"database\":\"default1\",\"table\":\"table2\",\"status\":\"INIT\"}}}";
+
+ DBReplicationStatus dbStatus = new DBReplicationStatus(jsonString);
+ Assert.assertEquals(dbStatus.getDatabaseStatus().getDatabase(), "default1");
+ Assert.assertEquals(dbStatus.getDatabaseStatus().getJobName(), "jobname");
+ Assert.assertEquals(dbStatus.getDatabaseStatus().getEventId(), 20);
+
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getEventId(), 20);
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getStatus(), ReplicationStatus.Status.SUCCESS);
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getEventId(), -1);
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getStatus(), ReplicationStatus.Status.INIT);
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getEventId(), 10);
+ Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getStatus(), ReplicationStatus.Status.FAILURE);
+
+
+ }
+
+ public void wrongDBForTableTest() throws Exception {
+
+ ReplicationStatus newDbStatus = new ReplicationStatus("source", "target", "jobname",
+ "wrongDb", null, ReplicationStatus.Status.FAILURE, 20L);
+ new DBReplicationStatus(newDbStatus);
+
+ try {
+ new DBReplicationStatus(newDbStatus, tableStatuses);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Cannot set status for table default1.table1, It does not belong to DB wrongdb");
+ }
+
+ String jsonString = "{\n" + " \"db_status\": {\n"
+ + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n"
+ + " \"jobName\": \"jobname\",\n" + " \"database\": \"wrongdb\",\n"
+ + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n"
+ + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n"
+ + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}";
+
+ try {
+ new DBReplicationStatus(jsonString);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Unable to create DBReplicationStatus from JsonString. Cannot set status for "
+ + "table default1.table1, It does not belong to DB wrongdb");
+ }
+ }
+
+ public void updateTableStatusTest() throws Exception {
+ DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+ replicationStatus.updateTableStatus(tableStatus1);
+
+ // wrong DB test
+ try {
+ replicationStatus.updateTableStatus(new ReplicationStatus("source", "target", "jobname",
+ "wrongDB", "table2", ReplicationStatus.Status.INIT, -1L));
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Cannot update Table Status. TableDB wrongdb does not match current DB default1");
+ }
+
+ // wrong status test
+ try {
+ replicationStatus.updateTableStatus(dbReplicationStatus);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Cannot update Table Status. Table name is empty.");
+ }
+
+ }
+
+ public void updateDBStatusTest() throws Exception {
+ DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses);
+ replicationStatus.updateDbStatus(dbReplicationStatus);
+
+ // wrong DB test
+ try {
+ replicationStatus.updateDbStatus(new ReplicationStatus("source", "target", "jobname",
+ "wrongDB", null, ReplicationStatus.Status.INIT, -1L));
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Cannot update Database Status. StatusDB wrongdb does not match current DB default1");
+ }
+
+ // wrong status test
+ try {
+ replicationStatus.updateDbStatus(tableStatus1);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Cannot update DB Status. This is table level status.");
+ }
+ }
+
+ public void updateDbStatusFromTableStatusesTest() throws Exception {
+
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname",
+ "default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname",
+ "Default1", "table2", ReplicationStatus.Status.INIT, -1L);
+ ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "Table3", ReplicationStatus.Status.FAILURE, 15L);
+ ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname",
+ "Default1", "Table4", ReplicationStatus.Status.FAILURE, 18L);
+ Map<String, ReplicationStatus> tables = new HashMap<String, ReplicationStatus>();
+
+ tables.put("table1", table1);
+ tables.put("table2", table2);
+ tables.put("table3", table3);
+ tables.put("table4", table4);
+
+ // If there is a failue, last eventId should be lowest eventId of failed tables
+ DBReplicationStatus status = new DBReplicationStatus(dbStatus, tables);
+ Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+ Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+ status.updateDbStatusFromTableStatuses();
+ Assert.assertEquals(status.getDatabaseStatus().getEventId(), 15);
+ Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.FAILURE);
+
+ // If all tables succeed, last eventId should be highest eventId of success tables
+ table3 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "table3", ReplicationStatus.Status.SUCCESS, 25L);
+ table4 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "table4", ReplicationStatus.Status.SUCCESS, 22L);
+ tables.put("Table3", table3);
+ tables.put("Table4", table4);
+ status = new DBReplicationStatus(dbStatus, tables);
+ status.updateDbStatusFromTableStatuses();
+ Assert.assertEquals(status.getDatabaseStatus().getEventId(), 25);
+ Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+
+ // Init tables should not change DB status.
+ Map<String, ReplicationStatus> initOnlyTables = new HashMap<String, ReplicationStatus>();
+ initOnlyTables.put("table2", table2);
+ dbStatus = new ReplicationStatus("source", "target", "jobname",
+ "default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+ status = new DBReplicationStatus(dbStatus, initOnlyTables);
+ Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+ Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+ status.updateDbStatusFromTableStatuses();
+ Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20);
+ Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS);
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
new file mode 100644
index 0000000..1f44b62
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+/**
+ * Test class for DR.
+ */
+public class DRTest {
+ public void testHiveDr(String[] args) {
+ String[] testArgs = {
+ "-sourceMetastoreUri", "thrift://localhost:9083",
+ "-sourceDatabase", "default",
+ "-sourceTable", "test",
+ "-sourceStagingPath", "/apps/hive/tools/dr",
+ "-sourceNN", "hdfs://localhost:8020",
+ "-sourceRM", "local",
+
+ "-targetMetastoreUri", "thrift://localhost:9083",
+ "-targetStagingPath", "/apps/hive/tools/dr",
+ "-targetNN", "hdfs://localhost:8020",
+ "-targetRM", "local",
+
+ "-maxEvents", "5",
+ "-replicationMaxMaps", "1",
+ "-distcpMapBandwidth", "4",
+ };
+ HiveDRTool.main(testArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
new file mode 100644
index 0000000..c89c661
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.hadoop.JailedFileSystem;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Unit tests for HiveDRStatusStore.
+ */
+@Test
+public class HiveDRStatusStoreTest {
+ private HiveDRStatusStore drStatusStore;
+ private FileSystem fileSystem = new JailedFileSystem();
+
+ public HiveDRStatusStoreTest() throws Exception {
+ EmbeddedCluster cluster = EmbeddedCluster.newCluster("hiveReplTest");
+ Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH);
+
+ fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf());
+ if (fileSystem.exists(storePath)) {
+ fileSystem.delete(storePath, true);
+ }
+ FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION);
+ try {
+ new HiveDRStatusStore(fileSystem);
+ Assert.fail();
+ } catch (IOException ie) {
+ // Exception expected.
+ Assert.assertEquals(ie.getMessage(), "Base dir jail://hiveReplTest:00" + storePath.toUri()
+ + " does not have correct ownership/permissions."
+ + " Please set group to " + DRStatusStore.getStoreGroup() + " and permissions to rwxrwx---");
+ }
+ drStatusStore = new HiveDRStatusStore(fileSystem, fileSystem.getFileStatus(storePath).getGroup());
+ }
+
+ @BeforeClass
+ public void updateReplicationStatusTest() throws Exception {
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname",
+ "Default1", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname",
+ "Default1", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "Table2", ReplicationStatus.Status.INIT, -1L);
+ ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname",
+ "Default1", "Table3", ReplicationStatus.Status.FAILURE, 15L);
+ ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname",
+ "default1", "table4", ReplicationStatus.Status.FAILURE, 18L);
+ ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(table1);
+ replicationStatusList.add(table2);
+ replicationStatusList.add(table3);
+ replicationStatusList.add(table4);
+ replicationStatusList.add(dbStatus);
+ drStatusStore.updateReplicationStatus("jobname", replicationStatusList);
+ }
+
+ public void updateReplicationStatusNewTablesTest() throws Exception {
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname2",
+ "default2", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname2",
+ "Default2", "table1", ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname2",
+ "default2", "Table2", ReplicationStatus.Status.INIT, -1L);
+ ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname2",
+ "default2", "table3", ReplicationStatus.Status.FAILURE, 15L);
+ ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname2",
+ "Default2", "Table4", ReplicationStatus.Status.FAILURE, 18L);
+ ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(table1);
+ replicationStatusList.add(table2);
+ replicationStatusList.add(table3);
+ replicationStatusList.add(table4);
+ replicationStatusList.add(dbStatus);
+
+ drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+ ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2");
+ Assert.assertEquals(status.getEventId(), 15);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+ Assert.assertEquals(status.getJobName(), "jobname2");
+ Assert.assertEquals(status.getTable(), null);
+ Assert.assertEquals(status.getSourceUri(), "source");
+
+ Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+ "jobname2", "default2");
+ int size = 0;
+ while(iter.hasNext()) {
+ iter.next();
+ size++;
+ }
+ Assert.assertEquals(4, size);
+
+ table3 = new ReplicationStatus("source", "target", "jobname2",
+ "default2", "table3", ReplicationStatus.Status.SUCCESS, 25L);
+ table4 = new ReplicationStatus("source", "target", "jobname2",
+ "Default2", "table4", ReplicationStatus.Status.SUCCESS, 22L);
+ ReplicationStatus table5 = new ReplicationStatus("source", "target", "jobname2",
+ "default2", "Table5", ReplicationStatus.Status.SUCCESS, 18L);
+ ReplicationStatus db1table1 = new ReplicationStatus("source", "target", "jobname2",
+ "Default1", "Table1", ReplicationStatus.Status.SUCCESS, 18L);
+ replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(table5);
+ replicationStatusList.add(table3);
+ replicationStatusList.add(table4);
+ replicationStatusList.add(db1table1);
+
+ drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+ status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default1");
+ Assert.assertEquals(status.getEventId(), 18);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+
+ status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2");
+ Assert.assertEquals(status.getEventId(), 25);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+
+ iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+ "jobname2", "default2");
+ size = 0;
+ while(iter.hasNext()) {
+ iter.next();
+ size++;
+ }
+ Assert.assertEquals(5, size);
+ }
+
+ public void getReplicationStatusDBTest() throws HiveReplicationException {
+ ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname", "Default1");
+ Assert.assertEquals(status.getEventId(), 15);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+ Assert.assertEquals(status.getJobName(), "jobname");
+ Assert.assertEquals(status.getTable(), null);
+ Assert.assertEquals(status.getSourceUri(), "source");
+ }
+
+ public void checkReplicationConflictTest() throws HiveReplicationException {
+
+ try {
+ //same source, same job, same DB, null table : pass
+ drStatusStore.checkForReplicationConflict("source", "jobname", "default1", null);
+
+ //same source, same job, same DB, same table : pass
+ drStatusStore.checkForReplicationConflict("source", "jobname", "default1", "table1");
+
+ //same source, same job, different DB, null table : pass
+ drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", null);
+
+ //same source, same job, different DB, different table : pass
+ drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", "diffTable");
+
+ // same source, different job, same DB, diff table : pass
+ drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "diffTable");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+
+ try {
+ // different source, same job, same DB, null table : fail
+ drStatusStore.checkForReplicationConflict("diffSource", "jobname", "default1", null);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Two different sources are attempting to replicate to same db default1."
+ + " New Source = diffSource, Existing Source = source");
+ }
+
+ try {
+ // same source, different job, same DB, null table : fail
+ drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", null);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Two different jobs are attempting to replicate to same db default1."
+ + " New Job = diffJob, Existing Job = jobname");
+ }
+
+ try {
+ // same source, different job, same DB, same table : fail
+ drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "table1");
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Two different jobs are trying to replicate to same table table1."
+ + " New job = diffJob, Existing job = jobname");
+ }
+
+
+ }
+
+ public void deleteReplicationStatusTest() throws Exception {
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "deleteJob",
+ "deleteDB", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table1 = new ReplicationStatus("source", "target", "deleteJob",
+ "deleteDB", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+ ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(table1);
+ replicationStatusList.add(dbStatus);
+ drStatusStore.updateReplicationStatus("deleteJob", replicationStatusList);
+
+ ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "deleteJob", "deleteDB");
+ Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName());
+ Assert.assertEquals(fileSystem.exists(statusPath), true);
+
+ drStatusStore.deleteReplicationStatus("deleteJob", "deleteDB");
+ Assert.assertEquals(fileSystem.exists(statusPath), false);
+ }
+
+ public void getReplicationStatusTableTest() throws HiveReplicationException {
+ ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target",
+ "jobname", "default1", "table1");
+ Assert.assertEquals(status.getEventId(), 20);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS);
+ Assert.assertEquals(status.getTable(), "table1");
+
+ status = drStatusStore.getReplicationStatus("source", "target",
+ "jobname", "Default1", "Table2");
+ Assert.assertEquals(status.getEventId(), -1);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT);
+ Assert.assertEquals(status.getTable(), "table2");
+
+ status = drStatusStore.getReplicationStatus("source", "target",
+ "jobname", "default1", "Table3");
+ Assert.assertEquals(status.getEventId(), 15);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+ Assert.assertEquals(status.getTable(), "table3");
+
+ status = drStatusStore.getReplicationStatus("source", "target",
+ "jobname", "default1", "table4");
+ Assert.assertEquals(status.getEventId(), 18);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+ Assert.assertEquals(status.getTable(), "table4");
+ }
+
+ public void getTableReplicationStatusesInDbTest() throws HiveReplicationException {
+ Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target",
+ "jobname", "Default1");
+ int size = 0;
+ while(iter.hasNext()) {
+ size++;
+ ReplicationStatus status = iter.next();
+ if (status.getTable().equals("table3")) {
+ Assert.assertEquals(status.getEventId(), 15);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE);
+ Assert.assertEquals(status.getTable(), "table3");
+ }
+ }
+ Assert.assertEquals(4, size);
+ }
+
+ public void fileRotationTest() throws Exception {
+ // initialize replication status store for db default3.
+ // This should init with eventId = -1 and status = INIT
+ ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target",
+ "jobname3", "default3");
+ Assert.assertEquals(status.getEventId(), -1);
+ Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT);
+
+ // update status 5 times resulting in 6 files : latest.json + five rotated files
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3",
+ "Default3", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname3",
+ "default3", "Table1", ReplicationStatus.Status.SUCCESS, 20L);
+ ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(table1);
+ replicationStatusList.add(dbStatus);
+
+ for(int i=0; i<5; i++) {
+ Thread.sleep(2000);
+ drStatusStore.updateReplicationStatus("jobname3", replicationStatusList);
+ }
+
+ status = drStatusStore.getReplicationStatus("source", "target", "jobname3", "default3");
+ Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName());
+ RemoteIterator<LocatedFileStatus> iter = fileSystem.listFiles(statusPath, false);
+ Assert.assertEquals(getRemoteIterSize(iter), 6);
+
+ drStatusStore.rotateStatusFiles(statusPath, 3, 10000000);
+ iter = fileSystem.listFiles(statusPath, false);
+ Assert.assertEquals(getRemoteIterSize(iter), 6);
+
+ drStatusStore.rotateStatusFiles(statusPath, 3, 6000);
+ iter = fileSystem.listFiles(statusPath, false);
+ Assert.assertEquals(getRemoteIterSize(iter), 3);
+ }
+
+ public void wrongJobNameTest() throws Exception {
+ ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3",
+ "Default3", null, ReplicationStatus.Status.SUCCESS, 20L);
+ ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>();
+ replicationStatusList.add(dbStatus);
+
+ try {
+ drStatusStore.updateReplicationStatus("jobname2", replicationStatusList);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ // Expected exception due to jobname mismatch
+ }
+ }
+
+ @AfterClass
+ public void cleanUp() throws IOException {
+ fileSystem.delete(new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH), true);
+ }
+
+ private int getRemoteIterSize(RemoteIterator<LocatedFileStatus> iter) throws IOException {
+ int size = 0;
+ while(iter.hasNext()) {
+ iter.next();
+ size++;
+ }
+ return size;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
new file mode 100644
index 0000000..a02639c
--- /dev/null
+++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hive;
+
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for ReplicationStatus.
+ */
+@Test
+public class ReplicationStatusTest {
+
+ private ReplicationStatus dbStatus, tableStatus;
+
+ public ReplicationStatusTest() {}
+
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ dbStatus = new ReplicationStatus("source", "target", "jobname",
+ "default1", null, ReplicationStatus.Status.INIT, 0L);
+ tableStatus = new ReplicationStatus("source", "target", "jobname",
+ "testDb", "Table1", ReplicationStatus.Status.SUCCESS, 0L);
+ }
+
+ public void replicationStatusSerializeTest() throws Exception {
+ String expected = "{\n \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n \"jobName\": \"jobname\",\n"
+ + " \"database\": \"testdb\",\n \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}";
+ String actual = tableStatus.toJsonString();
+ Assert.assertEquals(actual, expected);
+
+ expected = "{\n \"sourceUri\": \"source\",\n \"targetUri\": \"target\",\n"
+ + " \"jobName\": \"jobname\",\n \"database\": \"default1\",\n"
+ + " \"status\": \"INIT\",\n \"eventId\": 0\n}";
+ actual = dbStatus.toJsonString();
+ Assert.assertEquals(actual, expected);
+ }
+
+ public void replicationStatusDeserializeTest() throws Exception {
+ String tableInput = "{\n \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n"
+ + " \"database\": \"Test1\",\n \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}";
+ String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n"
+ + " \"database\": \"default1\", \"status\": \"FAILURE\","
+ + " \"eventId\": 27, \"statusLog\": \"testLog\"}";
+
+ ReplicationStatus newDbStatus = new ReplicationStatus(dbInput);
+ ReplicationStatus newTableStatus = new ReplicationStatus(tableInput);
+
+ Assert.assertEquals(newDbStatus.getTable(), null);
+ Assert.assertEquals(newDbStatus.getEventId(), 27);
+ Assert.assertEquals(newDbStatus.getDatabase(), "default1");
+ Assert.assertEquals(newDbStatus.getLog(), "testLog");
+ Assert.assertEquals(newDbStatus.getStatus(), ReplicationStatus.Status.FAILURE);
+
+
+ Assert.assertEquals(newTableStatus.getTable(), "table1");
+ Assert.assertEquals(newTableStatus.getEventId(), 0);
+ Assert.assertEquals(newTableStatus.getDatabase(), "test1");
+ Assert.assertEquals(newTableStatus.getJobName(), "testJob");
+
+ // no table, no eventId, no log
+ dbInput = "{\n \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n"
+ + " \"database\": \"Test1\",\n"
+ + " \"status\": \"SUCCESS\"\n}";
+ newDbStatus = new ReplicationStatus(dbInput);
+
+ Assert.assertEquals(newDbStatus.getDatabase(), "test1");
+ Assert.assertEquals(newDbStatus.getTable(), null);
+ Assert.assertEquals(newDbStatus.getEventId(), -1);
+ Assert.assertEquals(newDbStatus.getLog(), null);
+
+ }
+
+ public void invalidEventIdTest() throws Exception {
+ String tableInput = "{\n \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n"
+ + " \"database\": \"test1\",\n \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n \"eventId\": -100\n}";
+
+ ReplicationStatus newTableStatus = new ReplicationStatus(tableInput);
+ Assert.assertEquals(newTableStatus.getEventId(), -1);
+
+ newTableStatus.setEventId(-200);
+ Assert.assertEquals(newTableStatus.getEventId(), -1);
+
+ String expected = "{\n \"sourceUri\": \"source\",\n"
+ + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n"
+ + " \"database\": \"test1\",\n \"table\": \"table1\",\n"
+ + " \"status\": \"SUCCESS\",\n \"eventId\": -1\n}";
+ String actual = newTableStatus.toJsonString();
+ Assert.assertEquals(actual, expected);
+
+ newTableStatus.setEventId(50);
+ Assert.assertEquals(newTableStatus.getEventId(), 50);
+ }
+
+ public void invalidStatusTest() throws Exception {
+
+ String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n"
+ + " \"database\": \"default1\", \"status\": \"BLAH\","
+ + " \"eventId\": 27, \"statusLog\": \"testLog\"}";
+
+ try {
+ new ReplicationStatus(dbInput);
+ Assert.fail();
+ } catch (HiveReplicationException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Unable to deserialize jsonString to ReplicationStatus. Invalid status BLAH");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
index 824e6f5..de0f748 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml
@@ -17,30 +17,27 @@
limitations under the License.
-->
-<process name="##name##" xmlns="uri:falcon:process:0.1">
+<process name="##falcon.recipe.job.name##" xmlns="uri:falcon:process:0.1">
<clusters>
<!-- source -->
- <cluster name="##src.cluster.name##">
- <validity end="##src.cluster.validity.end##" start="##src.cluster.validity.start##"/>
+ <cluster name="##falcon.recipe.cluster.name##">
+ <validity end="##falcon.recipe.cluster.validity.end##" start="##falcon.recipe.cluster.validity.start##"/>
</cluster>
</clusters>
+ <tags>_falcon_mirroring_type=HDFS</tags>
+
<parallel>1</parallel>
<!-- Dir replication needs to run only once to catch up -->
<order>LAST_ONLY</order>
- <frequency>##process.frequency##</frequency>
+ <frequency>##falcon.recipe.frequency##</frequency>
<timezone>UTC</timezone>
<properties>
<property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
- <property name="##process.property2.name##" value="##process.property2.value##"/>
- <property name="##process.property3.name##" value="##process.property3.value##"/>
- <property name="##process.property4.name##" value="##process.property4.value##"/>
- <property name="##process.property5.name##" value="##process.property5.value##"/>
- <property name="##process.property6.name##" value="##process.property6.value##"/>
- <property name="##process.property7.name##" value="##process.property7.value##"/>
</properties>
- <workflow name="##workflow.name##" engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
- <retry policy="periodic" delay="minutes(10)" attempts="3"/>
-</process>
+ <workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml" lib="##workflow.lib.path##"/>
+ <retry policy="##falcon.recipe.retry.policy##" delay="##falcon.recipe.retry.delay##" attempts="3"/>
+ <ACL/>
+</process>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
index 145d489..d6a4ee9 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml
@@ -47,28 +47,78 @@
<name>oozie.launcher.oozie.libpath</name>
<value>${wf:conf("falcon.libpath")}</value>
</property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${drSourceClusterFS},${drTargetClusterFS}</value>
+ </property>
</configuration>
<main-class>org.apache.falcon.replication.FeedReplicator</main-class>
<arg>-Dmapred.job.queue.name=${queueName}</arg>
<arg>-Dmapred.job.priority=${jobPriority}</arg>
<arg>-maxMaps</arg>
- <arg>${maxMaps}</arg>
+ <arg>${distcpMaxMaps}</arg>
<arg>-mapBandwidth</arg>
- <arg>${mapBandwidth}</arg>
+ <arg>${distcpMapBandwidth}</arg>
<arg>-sourcePaths</arg>
- <arg>${nameNode}${drSourceDir}</arg>
+ <arg>${drSourceDir}</arg>
<arg>-targetPath</arg>
<arg>${drTargetClusterFS}${drTargetDir}</arg>
<arg>-falconFeedStorageType</arg>
<arg>FILESYSTEM</arg>
+ <arg>-availabilityFlag</arg>
+ <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
+ <arg>-counterLogDir</arg>
+ <arg>${logDir}/job-${nominalTime}</arg>
</java>
+ <ok to="success"/>
+ <error to="failure"/>
+ </action>
+ <decision name="success">
+ <switch>
+ <case to="successAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="end"/>
+ </switch>
+ </decision>
+ <decision name="failure">
+ <switch>
+ <case to="failureAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="fail"/>
+ </switch>
+ </decision>
+ <action name="successAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>INFO: HDFS DR workflow ${entityName} completed successfully</subject>
+ <body>
+ The HDFS DR workflow ${wf:id()} is successful.
+ Source = ${drSourceDir}
+ Target = ${drTargetClusterFS}${drTargetDir}
+ </body>
+ </email>
+ <ok to="end"/>
+ <error to="end"/>
+ </action>
+ <action name="failureAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>ERROR: HDFS DR workflow ${entityName} failed</subject>
+ <body>
+ The workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+ Source = ${drSourceDir}
+ Target = ${drTargetClusterFS}${drTargetDir}
+ </body>
+ </email>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>
- Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
- <end name='end'/>
+ <end name="end"/>
</workflow-app>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
index 19b8459..64ab6b8 100644
--- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
+++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties
@@ -19,47 +19,59 @@
##### NOTE: This is a TEMPLATE file which can be copied and edited
##### Recipe properties
-falcon.recipe.name=hdfs-replication
-
+##### Unique recipe job name
+falcon.recipe.name=sales-monthly
##### Workflow properties
-
falcon.recipe.workflow.name=hdfs-dr-workflow
# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
-falcon.recipe.workflow.path=/recipes/hdfs-replication/hdfs-replication-workflow.xml
+falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml
# Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS
#falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib
##### Cluster properties
+# Cluster where job should run
+falcon.recipe.cluster.name=primaryCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2015-03-13T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
-# Change the src cluster name here
-falcon.recipe.src.cluster.name=test
-# Change the src cluster hdfs write end point here. This is mandatory.
-falcon.recipe.src.cluster.hdfs.writeEndPoint=hdfs://sandbox.hortonworks.com:8020
-# Change the src cluster validity start time here
-falcon.recipe.src.cluster.validity.start=2014-10-01T00:00Z
-# Change the src cluster validity end time here
-falcon.recipe.src.cluster.validity.end=2016-12-30T00:00Z
+##### Scheduling properties
+# Change the recipe frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(5)
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=
-##### Scheduling properties
+##### Retry policy properties
-# Change the process here. Valid frequency type are minutes, hours, days, months
-falcon.recipe.process.frequency=minutes(60)
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+falcon.recipe.acl.owner=ambari-qa
+falcon.recipe.acl.group=users
+falcon.recipe.acl.permission=0x755
+falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
##### Custom Job properties
-# Specify property names and values for properties defined in recipe template
-falcon.recipe.process.property2.name=drSourceDir
-falcon.recipe.process.property2.value=/falcon/test/srcCluster/input
-falcon.recipe.process.property3.name=drTargetClusterFS
-falcon.recipe.process.property3.value=hdfs://sandbox.hortonworks.com:8020
-falcon.recipe.process.property4.name=drTargetDir
-falcon.recipe.process.property4.value=/falcon/test/targetCluster/input
-falcon.recipe.process.property5.name=drTargetCluster
-falcon.recipe.process.property5.value=backupCluster
-falcon.recipe.process.property6.name=maxMaps
-falcon.recipe.process.property6.value=5
-falcon.recipe.process.property7.name=mapBandwidth
-falcon.recipe.process.property7.value=100
+# Specify multiple comma separated source directories
+drSourceDir=/user/hrt_qa/dr/test/primaryCluster/input
+drSourceClusterFS=hdfs://240.0.0.10:8020
+drTargetDir=/user/hrt_qa/dr/test/backupCluster/input
+drTargetClusterFS=hdfs://240.0.0.11:8020
+
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/README.txt
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/README.txt b/addons/recipes/hive-disaster-recovery/README.txt
new file mode 100644
index 0000000..ab393b1
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/README.txt
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Hive Metastore Disaster Recovery Recipe
+
+Overview
+This recipe implements replicating hive metadata and data from one
+Hadoop cluster to another Hadoop cluster.
+This piggy backs on replication solution in Falcon which uses the DistCp tool.
+
+Use Case
+*
+*
+
+Limitations
+*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Hive Metastore Disaster Recovery Recipe
+
+Overview
+This recipe implements replicating hive metadata and data from one
+Hadoop cluster to another Hadoop cluster.
+This piggy backs on replication solution in Falcon which uses the DistCp tool.
+
+Use Case
+*
+*
+
+Limitations
+*
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/pom.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/pom.xml b/addons/recipes/hive-disaster-recovery/pom.xml
new file mode 100644
index 0000000..1732907
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.falcon.recipes</groupId>
+ <artifactId>falcon-hive-replication-recipe</artifactId>
+ <version>0.7-SNAPSHOT</version>
+ <description>Apache Falcon Hive Disaster Recovery Recipe</description>
+ <name>Apache Falcon Sample Hive Disaster Recovery Recipe</name>
+ <packaging>jar</packaging>
+</project>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
new file mode 100644
index 0000000..3afbef0
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="##name##" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <!-- source -->
+ <cluster name="##cluster.name##">
+ <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/>
+ </cluster>
+ </clusters>
+
+ <tags>_falcon_mirroring_type=HIVE</tags>
+
+ <parallel>1</parallel>
+ <!-- Replication needs to run only once to catch up -->
+ <order>LAST_ONLY</order>
+ <frequency>##process.frequency##</frequency>
+ <timezone>UTC</timezone>
+
+ <properties>
+ <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+ </properties>
+
+ <workflow name="##workflow.name##" engine="oozie"
+ path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
+ <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+ <ACL/>
+</process>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
new file mode 100644
index 0000000..7362c2e
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
@@ -0,0 +1,401 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'>
+ <credentials>
+ <credential name='hive_src_credentials' type='hcat'>
+ <property>
+ <name>hcat.metastore.uri</name>
+ <value>${sourceMetastoreUri}</value>
+ </property>
+ <property>
+ <name>hcat.metastore.principal</name>
+ <value>${sourceHiveMetastoreKerberosPrincipal}</value>
+ </property>
+ </credential>
+ <credential name='hive_tgt_credentials' type='hcat'>
+ <property>
+ <name>hcat.metastore.uri</name>
+ <value>${targetMetastoreUri}</value>
+ </property>
+ <property>
+ <name>hcat.metastore.principal</name>
+ <value>${targetHiveMetastoreKerberosPrincipal}</value>
+ </property>
+ </credential>
+ <credential name="hive2_src_credentials" type="hive2">
+ <property>
+ <name>hive2.server.principal</name>
+ <value>${sourceHive2KerberosPrincipal}</value>
+ </property>
+ <property>
+ <name>hive2.jdbc.url</name>
+ <value>jdbc:${sourceHiveServer2Uri}/${sourceDatabase}</value>
+ </property>
+ </credential>
+ <credential name="hive2_tgt_credentials" type="hive2">
+ <property>
+ <name>hive2.server.principal</name>
+ <value>${targetHive2KerberosPrincipal}</value>
+ </property>
+ <property>
+ <name>hive2.jdbc.url</name>
+ <value>jdbc:${targetHiveServer2Uri}/${sourceDatabase}</value>
+ </property>
+ </credential>
+ </credentials>
+ <start to='last-event'/>
+ <action name="last-event" cred="hive_tgt_credentials">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-sourceNNKerberosPrincipal</arg>
+ <arg>${sourceNNKerberosPrincipal}</arg>
+ <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+ <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-sourceHive2KerberosPrincipal</arg>
+ <arg>${sourceHive2KerberosPrincipal}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-targetNNKerberosPrincipal</arg>
+ <arg>${targetNNKerberosPrincipal}</arg>
+ <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+ <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-targetHive2KerberosPrincipal</arg>
+ <arg>${targetHive2KerberosPrincipal}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-clusterForJobNNKerberosPrincipal</arg>
+ <arg>${clusterForJobNNKerberosPrincipal}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>lastevents</arg>
+ </java>
+ <ok to="export-dr-replication"/>
+ <error to="failure"/>
+ </action>
+ <!-- Export Replication action -->
+ <action name="export-dr-replication" cred="hive_src_credentials,hive2_src_credentials">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-replicationMaxMaps</arg>
+ <arg>${replicationMaxMaps}</arg>
+ <arg>-distcpMaxMaps</arg>
+ <arg>${distcpMaxMaps}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-sourceNNKerberosPrincipal</arg>
+ <arg>${sourceNNKerberosPrincipal}</arg>
+ <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+ <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-sourceHive2KerberosPrincipal</arg>
+ <arg>${sourceHive2KerberosPrincipal}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-targetNNKerberosPrincipal</arg>
+ <arg>${targetNNKerberosPrincipal}</arg>
+ <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+ <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-targetHive2KerberosPrincipal</arg>
+ <arg>${targetHive2KerberosPrincipal}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-distcpMapBandwidth</arg>
+ <arg>${distcpMapBandwidth}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-clusterForJobNNKerberosPrincipal</arg>
+ <arg>${clusterForJobNNKerberosPrincipal}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>export</arg>
+ </java>
+ <ok to="import-dr-replication"/>
+ <error to="failure"/>
+ </action>
+ <!-- Import Replication action -->
+ <action name="import-dr-replication" cred="hive_tgt_credentials,hive2_tgt_credentials">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ <property>
+ <name>mapreduce.job.hdfs-servers</name>
+ <value>${sourceNN},${targetNN}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-replicationMaxMaps</arg>
+ <arg>${replicationMaxMaps}</arg>
+ <arg>-distcpMaxMaps</arg>
+ <arg>${distcpMaxMaps}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-sourceNNKerberosPrincipal</arg>
+ <arg>${sourceNNKerberosPrincipal}</arg>
+ <arg>-sourceHiveMetastoreKerberosPrincipal</arg>
+ <arg>${sourceHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-sourceHive2KerberosPrincipal</arg>
+ <arg>${sourceHive2KerberosPrincipal}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-targetNNKerberosPrincipal</arg>
+ <arg>${targetNNKerberosPrincipal}</arg>
+ <arg>-targetHiveMetastoreKerberosPrincipal</arg>
+ <arg>${targetHiveMetastoreKerberosPrincipal}</arg>
+ <arg>-targetHive2KerberosPrincipal</arg>
+ <arg>${targetHive2KerberosPrincipal}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-distcpMapBandwidth</arg>
+ <arg>${distcpMapBandwidth}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-clusterForJobNNKerberosPrincipal</arg>
+ <arg>${clusterForJobNNKerberosPrincipal}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>import</arg>
+ </java>
+ <ok to="success"/>
+ <error to="failure"/>
+ </action>
+ <decision name="success">
+ <switch>
+ <case to="successAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="end"/>
+ </switch>
+ </decision>
+ <decision name="failure">
+ <switch>
+ <case to="failureAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="fail"/>
+ </switch>
+ </decision>
+ <action name="successAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
+ <body>
+ The Hive DR workflow ${wf:id()} is successful.
+ Source = ${sourceCluster}
+ Target = ${targetCluster}
+ DB Name = ${sourceDatabase}
+ Table Name = ${sourceTable}
+ </body>
+ </email>
+ <ok to="end"/>
+ <error to="end"/>
+ </action>
+ <action name="failureAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
+ <body>
+ The Hive DR workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+ Source = ${sourceCluster}
+ Target = ${targetCluster}
+ DB Name = ${sourceDatabase}
+ Table Name = ${sourceTable}
+ </body>
+ </email>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <kill name="fail">
+ <message>
+ Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ </message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
new file mode 100644
index 0000000..b2d670a
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##### NOTE: This is a TEMPLATE file which can be copied and edited
+
+##### Recipe properties
+falcon.recipe.name=hive-disaster-recovery
+
+
+##### Workflow properties
+falcon.recipe.workflow.name=hive-dr-workflow
+# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
+falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-secure-workflow.xml
+
+##### Cluster properties
+
+# Change the cluster name where replication job should run here
+falcon.recipe.cluster.name=backupCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2014-10-01T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
+# Change the cluster namenode kerberos principal. This is mandatory on secure clusters.
+falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
+
+##### Scheduling properties
+
+# Change the process frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(60)
+
+##### Retry policy properties
+
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=owner=landing,pipeline=adtech
+
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+#falcon.recipe.acl.owner=testuser
+#falcon.recipe.acl.group=group
+#falcon.recipe.acl.permission=0x755
+
+##### Custom Job properties
+
+##### Source Cluster DR properties
+sourceCluster=primaryCluster
+sourceMetastoreUri=thrift://localhost:9083
+sourceHiveServer2Uri=hive2://localhost:10000
+# For DB level replicaiton to replicate multiple databases specify comma separated list of tables
+sourceDatabase=default
+# For DB level replication specify * for sourceTable.
+# For table level replication to replicate multiple tables specify comma separated list of tables
+sourceTable=testtable_dr
+sourceStagingPath=/apps/hive/tools/dr
+sourceNN=hdfs://localhost:8020
+# Specify kerberos principal required to access source namenode and hive servers, optional on non-secure cluster.
+sourceNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM
+sourceHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM
+sourceHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM
+
+##### Target Cluster DR properties
+targetCluster=backupCluster
+targetMetastoreUri=thrift://localhost:9083
+targetHiveServer2Uri=hive2://localhost:10000
+targetStagingPath=/apps/hive/tools/dr
+targetNN=hdfs://localhost:8020
+# Specify kerberos principal required to access target namenode and hive servers, optional on non-secure cluster.
+targetNNKerberosPrincipal=nn/_HOST@EXAMPLE.COM
+targetHiveMetastoreKerberosPrincipal=hive/_HOST@EXAMPLE.COM
+targetHive2KerberosPrincipal=hive/_HOST@EXAMPLE.COM
+
+# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit.
+# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously!
+maxEvents=-1
+# Change it to specify the maximum number of mappers for replication
+replicationMaxMaps=5
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
new file mode 100644
index 0000000..3afbef0
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="##name##" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <!-- source -->
+ <cluster name="##cluster.name##">
+ <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/>
+ </cluster>
+ </clusters>
+
+ <tags>_falcon_mirroring_type=HIVE</tags>
+
+ <parallel>1</parallel>
+ <!-- Replication needs to run only once to catch up -->
+ <order>LAST_ONLY</order>
+ <frequency>##process.frequency##</frequency>
+ <timezone>UTC</timezone>
+
+ <properties>
+ <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
+ </properties>
+
+ <workflow name="##workflow.name##" engine="oozie"
+ path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
+ <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+ <ACL/>
+</process>