You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/02/18 10:54:30 UTC

[4/6] incubator-asterixdb git commit: Asterix NCs Failback Support

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
new file mode 100644
index 0000000..bd01d99
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
new file mode 100644
index 0000000..b09c3d3
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
new file mode 100644
index 0000000..56a88a2
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
+use dataverse TinySocial;
+
+/* insert ids 11-20 */
+insert into dataset TinySocial.FacebookUsersInMemory {"id":11,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":12,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":13,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":14,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":15,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":16,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":17,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":18,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":19,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":20,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
new file mode 100644
index 0000000..67b492c
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
@@ -0,0 +1 @@
+startnode -n asterix -nodes nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
new file mode 100644
index 0000000..1746da6
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
@@ -0,0 +1 @@
+10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
index ae14ad0..94ecc27 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
@@ -29,4 +29,4 @@
 use dataverse TinySocial;
 
 load dataset FacebookUsers using localfs
-(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
deleted file mode 100644
index 5695ed7..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
index 8087689..d97f786 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
@@ -29,6 +29,6 @@
 use dataverse TinySocial;
 
 load dataset FacebookUsers using localfs
-(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
 
 insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
deleted file mode 100644
index 5695ed7..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
deleted file mode 100644
index 5eec164..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc1 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
new file mode 100644
index 0000000..61322c9
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
@@ -0,0 +1 @@
+{"State":"ACTIVE","Metadata_Node":"asterix_nc1","partition_0":"asterix_nc1","partition_1":"asterix_nc1","partition_2":"asterix_nc2","partition_3":"asterix_nc2"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
new file mode 100644
index 0000000..587a97a
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
@@ -0,0 +1 @@
+{"State":"ACTIVE","Metadata_Node":"asterix_nc2","partition_0":"asterix_nc2","partition_1":"asterix_nc2","partition_2":"asterix_nc2","partition_3":"asterix_nc2"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
new file mode 100644
index 0000000..2edeafb
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
@@ -0,0 +1 @@
+20
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
new file mode 100644
index 0000000..9a03714
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
@@ -0,0 +1 @@
+10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
index f033086..36c3992 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -17,21 +17,28 @@
  ! under the License.
  !-->
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
-  <test-group name="failover">
-    <test-case FilePath="failover">
-      <compilation-unit name="bulkload">
-        <output-dir compare="Text">bulkload</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="failover">
-      <compilation-unit name="mem_component_recovery">
-        <output-dir compare="Text">mem_component_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="failover">
-      <compilation-unit name="metadata_node">
-        <output-dir compare="Text">metadata_node</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
+    <test-group name="failover">
+        <test-case FilePath="failover">
+            <compilation-unit name="bulkload">
+                <output-dir compare="Text">bulkload</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="failover">
+            <compilation-unit name="mem_component_recovery">
+                <output-dir compare="Text">mem_component_recovery</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="failover">
+            <compilation-unit name="metadata_node">
+                <output-dir compare="Text">metadata_node</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group>
+    <test-group name="failback">
+        <test-case FilePath="failback">
+            <compilation-unit name="node_failback">
+                <output-dir compare="Text">node_failback</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group>
 </test-suite>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 4e6a3df..088a85b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@ import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -113,6 +114,7 @@ public class MetadataNode implements IMetadataNode {
 
     private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
+    private int metadataStoragePartition;
 
     public static final MetadataNode INSTANCE = new MetadataNode();
 
@@ -123,6 +125,8 @@ public class MetadataNode implements IMetadataNode {
     public void initialize(IAsterixAppRuntimeContext runtimeContext) {
         this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
         this.datasetLifecycleManager = runtimeContext.getDatasetLifecycleManager();
+        this.metadataStoragePartition = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties()
+                .getMetadataPartition().getPartitionId();
     }
 
     @Override
@@ -305,11 +309,11 @@ public class MetadataNode implements IMetadataNode {
         if (metadataIndex.isPrimaryIndex()) {
             return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
                     metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                    transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+                    transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
         } else {
             return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
                     metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
-                    transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+                    transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
         }
     }
 
@@ -641,8 +645,7 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
-    private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName)
-            throws MetadataException, RemoteException {
+    private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName) throws MetadataException {
         try {
             ITupleReference searchKey = createTuple(dataverseName);
             DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
@@ -673,7 +676,7 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
-    public List<Dataset> getAllDatasets(JobId jobId) throws MetadataException, RemoteException {
+    public List<Dataset> getAllDatasets(JobId jobId) throws MetadataException {
         try {
             ITupleReference searchKey = null;
             DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false);
@@ -686,7 +689,7 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
-    public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException, RemoteException {
+    public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException {
         try {
             ITupleReference searchKey = null;
             DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
@@ -699,8 +702,7 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
-    private void confirmDataverseCanBeDeleted(JobId jobId, String dataverseName)
-            throws MetadataException, RemoteException {
+    private void confirmDataverseCanBeDeleted(JobId jobId, String dataverseName) throws MetadataException {
         //If a dataset from a DIFFERENT dataverse
         //uses a type from this dataverse
         //throw an error
@@ -717,13 +719,13 @@ public class MetadataNode implements IMetadataNode {
     }
 
     private void confirmDatatypeIsUnused(JobId jobId, String dataverseName, String datatypeName)
-            throws MetadataException, RemoteException {
+            throws MetadataException {
         confirmDatatypeIsUnusedByDatatypes(jobId, dataverseName, datatypeName);
         confirmDatatypeIsUnusedByDatasets(jobId, dataverseName, datatypeName);
     }
 
     private void confirmDatatypeIsUnusedByDatasets(JobId jobId, String dataverseName, String datatypeName)
-            throws MetadataException, RemoteException {
+            throws MetadataException {
         //If any dataset uses this type, throw an error
         List<Dataset> datasets = getAllDatasets(jobId);
         for (Dataset set : datasets) {
@@ -735,7 +737,7 @@ public class MetadataNode implements IMetadataNode {
     }
 
     private void confirmDatatypeIsUnusedByDatatypes(JobId jobId, String dataverseName, String datatypeName)
-            throws MetadataException, RemoteException {
+            throws MetadataException {
         //If any datatype uses this type, throw an error
         //TODO: Currently this loads all types into memory. This will need to be fixed for large numbers of types
         List<Datatype> datatypes = getAllDatatypes(jobId);
@@ -768,7 +770,7 @@ public class MetadataNode implements IMetadataNode {
     }
 
     public List<String> getDatasetNamesPartitionedOnThisNodeGroup(JobId jobId, String nodegroup)
-            throws MetadataException, RemoteException {
+            throws MetadataException {
         //this needs to scan the datasets and return the datasets that use this nodegroup
         List<String> nodeGroupDatasets = new ArrayList<String>();
         List<Dataset> datasets = getAllDatasets(jobId);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 2744630..f79385c 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,14 +38,24 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.common.messaging.ReplicaEventMessage;
 import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
 import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
 import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.NodeFailbackPlan;
+import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * A holder class for properties related to the Asterix cluster.
@@ -57,15 +69,16 @@ public class AsterixClusterProperties {
      */
 
     private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
-
     public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
 
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
     private static final String IO_DEVICES = "iodevices";
     private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-    private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
+    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<String, Map<String, String>>();
 
     private final Cluster cluster;
+    private ClusterState state = ClusterState.UNUSABLE;
 
     private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
 
@@ -75,10 +88,14 @@ public class AsterixClusterProperties {
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
     private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
 
-    private long takeoverRequestId = 0;
+    private long clusterRequestId = 0;
     private String currentMetadataNode = null;
-    private boolean isMetadataNodeActive = false;
+    private boolean metadataNodeActive = false;
     private boolean autoFailover = false;
+    private boolean replicationEnabled = false;
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
 
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -99,43 +116,73 @@ public class AsterixClusterProperties {
                 node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
                 clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
                 currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
-                if (isAutoFailoverEnabled()) {
-                    autoFailover = cluster.getDataReplication().isAutoFailover();
-                }
+                replicationEnabled = isReplicationEnabled();
+                autoFailover = isAutoFailoverEnabled();
                 if (autoFailover) {
                     pendingTakeoverRequests = new HashMap<>();
+                    pendingProcessingFailbackPlans = new LinkedList<>();
+                    planId2FailbackPlanMap = new HashMap<>();
                 }
             }
         }
     }
 
-    private ClusterState state = ClusterState.UNUSABLE;
-
     public synchronized void removeNCConfiguration(String nodeId) {
-        updateNodePartitions(nodeId, false);
-        ncConfiguration.remove(nodeId);
-        if (nodeId.equals(currentMetadataNode)) {
-            isMetadataNodeActive = false;
-            LOGGER.info("Metadata node is now inactive");
-        }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + nodeId);
         }
-        if (autoFailover) {
-            requestPartitionsTakeover(nodeId);
+        activeNcConfiguration.remove(nodeId);
+
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                notifyFailbackPlansNodeFailure(nodeId);
+                revertFailedFailbackPlanEffects();
+            }
+        } else {
+            //an active node failed
+            failedNodes.add(nodeId);
+            if (nodeId.equals(currentMetadataNode)) {
+                metadataNodeActive = false;
+                LOGGER.info("Metadata node is now inactive");
+            }
+            updateNodePartitions(nodeId, false);
+            if (replicationEnabled) {
+                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+                if (autoFailover) {
+                    notifyFailbackPlansNodeFailure(nodeId);
+                    requestPartitionsTakeover(nodeId);
+                }
+            }
         }
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
-        ncConfiguration.put(nodeId, configuration);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registering configuration parameters for node id " + nodeId);
+        }
+        activeNcConfiguration.put(nodeId, configuration);
+
+        //a node trying to come back after failure
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                prepareFailbackPlan(nodeId);
+                return;
+            } else {
+                //a node completed local or remote recovery and rejoined
+                failedNodes.remove(nodeId);
+                if (replicationEnabled) {
+                    //notify other replica to reconnect to this node
+                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+                }
+            }
+        }
+
         if (nodeId.equals(currentMetadataNode)) {
-            isMetadataNodeActive = true;
+            metadataNodeActive = true;
             LOGGER.info("Metadata node is now active");
         }
         updateNodePartitions(nodeId, true);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" Registering configuration parameters for node id " + nodeId);
-        }
     }
 
     private synchronized void updateNodePartitions(String nodeId, boolean added) {
@@ -163,11 +210,17 @@ public class AsterixClusterProperties {
             }
         }
         //if all storage partitions are active as well as the metadata node, then the cluster is active
-        if (isMetadataNodeActive) {
+        if (metadataNodeActive) {
             state = ClusterState.ACTIVE;
             LOGGER.info("Cluster is now ACTIVE");
             //start global recovery
             AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
+            if (autoFailover) {
+                //if there are any pending failback requests, process them
+                if (pendingProcessingFailbackPlans.size() > 0) {
+                    processPendingFailbackPlans();
+                }
+            }
         } else {
             requestMetadataNodeTakeover();
         }
@@ -196,7 +249,7 @@ public class AsterixClusterProperties {
      *         if it does not correspond to the set of registered Node Controllers.
      */
     public synchronized String[] getIODevices(String nodeId) {
-        Map<String, String> ncConfig = ncConfiguration.get(nodeId);
+        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
         if (ncConfig == null) {
             if (LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -222,7 +275,7 @@ public class AsterixClusterProperties {
 
     public synchronized Set<String> getParticipantNodes() {
         Set<String> participantNodes = new HashSet<String>();
-        for (String pNode : ncConfiguration.keySet()) {
+        for (String pNode : activeNcConfiguration.keySet()) {
             participantNodes.add(pNode);
         }
         return participantNodes;
@@ -254,12 +307,12 @@ public class AsterixClusterProperties {
         this.globalRecoveryCompleted = globalRecoveryCompleted;
     }
 
-    public static boolean isClusterActive() {
-        if (AsterixClusterProperties.INSTANCE.getCluster() == null) {
+    public boolean isClusterActive() {
+        if (cluster == null) {
             // this is a virtual cluster
             return true;
         }
-        return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
+        return state == ClusterState.ACTIVE;
     }
 
     public static int getNumberOfNodes() {
@@ -279,8 +332,8 @@ public class AsterixClusterProperties {
 
     public synchronized ClusterPartition[] getClusterPartitons() {
         ArrayList<ClusterPartition> partitons = new ArrayList<>();
-        for (ClusterPartition cluster : clusterPartitions.values()) {
-            partitons.add(cluster);
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            partitons.add(partition);
         }
         return partitons.toArray(new ClusterPartition[] {});
     }
@@ -301,44 +354,53 @@ public class AsterixClusterProperties {
 
         //collect the partitions of the failed NC
         List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
-        for (ClusterPartition partition : lostPartitions) {
-            //find replicas for this partitions
-            Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
-            //find a replica that is still active
-            for (String replica : partitionReplicas) {
-                //TODO (mhubail) currently this assigns the partition to the first found active replica.
-                //It needs to be modified to consider load balancing.
-                if (ncConfiguration.containsKey(replica)) {
-                    if (!partitionRecoveryPlan.containsKey(replica)) {
-                        List<Integer> replicaPartitions = new ArrayList<>();
-                        replicaPartitions.add(partition.getPartitionId());
-                        partitionRecoveryPlan.put(replica, replicaPartitions);
-                    } else {
-                        partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+        if (lostPartitions.size() > 0) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+                        if (!partitionRecoveryPlan.containsKey(replica)) {
+                            List<Integer> replicaPartitions = new ArrayList<>();
+                            replicaPartitions.add(partition.getPartitionId());
+                            partitionRecoveryPlan.put(replica, replicaPartitions);
+                        } else {
+                            partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+                        }
                     }
                 }
             }
-        }
 
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
-                .getCCApplicationContext().getMessageBroker();
-        //For each replica, send a request to takeover the assigned partitions
-        for (String replica : partitionRecoveryPlan.keySet()) {
-            Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
-            long requestId = takeoverRequestId++;
-            TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, replica,
-                    failedNodeId, partitionsToTakeover);
-            pendingTakeoverRequests.put(requestId, takeoverRequest);
-            try {
-                messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
-            } catch (Exception e) {
-                /**
-                 * if we fail to send the request, it means the NC we tried to send the request to
-                 * has failed. When the failure notification arrives, we will send any pending request
-                 * that belongs to the failed NC to a different active replica.
-                 */
-                LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
-                e.printStackTrace();
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                    .getCCApplicationContext().getMessageBroker();
+            //For each replica, send a request to takeover the assigned partitions
+            for (String replica : partitionRecoveryPlan.keySet()) {
+                Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we tried to send the request to
+                     * has failed. When the failure notification arrives, we will send any pending request
+                     * that belongs to the failed NC to a different active replica.
+                     */
+                    LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
+                    e.printStackTrace();
+                }
             }
         }
     }
@@ -368,7 +430,6 @@ public class AsterixClusterProperties {
         for (Long requestId : failedTakeoverRequests) {
             pendingTakeoverRequests.remove(requestId);
         }
-
         return nodePartitions;
     }
 
@@ -406,19 +467,223 @@ public class AsterixClusterProperties {
 
     public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
         currentMetadataNode = reponse.getNodeId();
-        isMetadataNodeActive = true;
+        metadataNodeActive = true;
         LOGGER.info("Current metadata node: " + currentMetadataNode);
         updateClusterState();
     }
 
-    public synchronized String getCurrentMetadataNode() {
-        return currentMetadataNode;
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+                .getReplicationProperties();
+        Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
     }
 
-    public boolean isAutoFailoverEnabled() {
-        if (cluster != null && cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
-            return cluster.getDataReplication().isAutoFailover();
+    private synchronized void processPendingFailbackPlans() {
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) {
+                        ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
+                        clusterPartition.setActive(false);
+                        //partition expected to be returned to the failing back node
+                        clusterPartition.setActiveNodeId(failbackNode);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback completes
+                     */
+                    String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                            .getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                    }
+
+                    //force new jobs to wait
+                    state = ClusterState.REBALANCING;
+
+                    ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                            .getCCApplicationContext().getMessageBroker();
+                    //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+                    Set<PreparePartitionsFailbackRequestMessage> planFailbackRequests = plan.getPlanFailbackRequests();
+                    for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) {
+                        try {
+                            messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+                            plan.addPendingRequest(request);
+                        } catch (Exception e) {
+                            LOGGER.warning("Failed to send failback request to: " + request.getNodeID());
+                            e.printStackTrace();
+                            plan.notifyNodeFailure(request.getNodeID());
+                            revertFailedFailbackPlanEffects();
+                            break;
+                        }
+                    }
+                    /**
+                     * wait until the current plan is completed before processing the next plan.
+                     * when the current one completes or is reverted, the cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will be processed.
+                     */
+                    break;
+                } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back node
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                    .getCCApplicationContext().getMessageBroker();
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId());
+                e.printStackTrace();
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+                .getReplicationProperties();
+        Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
+        String nodeIdAddress = "";
+        //in case the node joined with a new IP address, we need to send it to the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getMessageBroker();
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    public synchronized boolean isMetadataNodeActive() {
+        return metadataNodeActive;
+    }
+
+    public boolean isReplicationEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null) {
+            return cluster.getDataReplication().isEnabled();
         }
         return false;
     }
+
+    public boolean isAutoFailoverEnabled() {
+        return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+    }
+
+    public synchronized JSONObject getClusterStateDescription() throws JSONException {
+        JSONObject stateDescription = new JSONObject();
+        stateDescription.put("State", state.name());
+        stateDescription.put("Metadata_Node", currentMetadataNode);
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            stateDescription.put("partition_" + partition.getPartitionId(), partition.getActiveNodeId());
+        }
+        return stateDescription;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
index 647a6a3..7502737 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
@@ -26,10 +26,12 @@ import java.util.HashSet;
 import java.util.Set;
 
 public class ReplicaFilesRequest {
-    Set<String> replicaIds;
+    private final Set<String> replicaIds;
+    private final Set<String> existingFiles;
 
-    public ReplicaFilesRequest(Set<String> replicaIds) {
+    public ReplicaFilesRequest(Set<String> replicaIds, Set<String> existingFiles) {
         this.replicaIds = replicaIds;
+        this.existingFiles = existingFiles;
     }
 
     public void serialize(OutputStream out) throws IOException {
@@ -38,6 +40,10 @@ public class ReplicaFilesRequest {
         for (String replicaId : replicaIds) {
             dos.writeUTF(replicaId);
         }
+        dos.writeInt(existingFiles.size());
+        for (String fileName : existingFiles) {
+            dos.writeUTF(fileName);
+        }
     }
 
     public static ReplicaFilesRequest create(DataInput input) throws IOException {
@@ -46,15 +52,19 @@ public class ReplicaFilesRequest {
         for (int i = 0; i < size; i++) {
             replicaIds.add(input.readUTF());
         }
-
-        return new ReplicaFilesRequest(replicaIds);
+        int filesCount = input.readInt();
+        Set<String> existingFiles = new HashSet<String>(filesCount);
+        for (int i = 0; i < filesCount; i++) {
+            existingFiles.add(input.readUTF());
+        }
+        return new ReplicaFilesRequest(replicaIds, existingFiles);
     }
 
     public Set<String> getReplicaIds() {
         return replicaIds;
     }
 
-    public void setReplicaIds(Set<String> replicaIds) {
-        this.replicaIds = replicaIds;
+    public Set<String> getExistingFiles() {
+        return existingFiles;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 790df66..d2380c1 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
-import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.management.NetworkingUtil;
@@ -52,7 +51,6 @@ public class ReplicationProtocol {
      * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
      * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
      * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
-     * UPDATE_REPLICA: used to update replica info such as IP Address change.
      * GOODBYE: used to notify replicas that the replication request has been completed
      * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
      * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
@@ -67,7 +65,6 @@ public class ReplicationProtocol {
         GET_REPLICA_LOGS,
         GET_REPLICA_MAX_LSN,
         GET_REPLICA_MIN_LSN,
-        UPDATE_REPLICA,
         GOODBYE,
         REPLICA_EVENT,
         LSM_COMPONENT_PROPERTIES,
@@ -115,8 +112,7 @@ public class ReplicationProtocol {
         //read replication request type
         NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
 
-        ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
-                .getInt()];
+        ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer.getInt()];
         return requestType;
     }
 
@@ -215,21 +211,6 @@ public class ReplicationProtocol {
         return requestBuffer;
     }
 
-    public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-
-        oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        replica.writeFields(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        return buffer;
-    }
-
     public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         DataOutputStream oos = new DataOutputStream(outputStream);
@@ -244,12 +225,6 @@ public class ReplicationProtocol {
         return buffer;
     }
 
-    public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return Replica.create(dis);
-    }
-
     public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
         ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
         DataInputStream dis = new DataInputStream(bais);
@@ -257,22 +232,24 @@ public class ReplicationProtocol {
         return ReplicaEvent.create(dis);
     }
 
-    public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
+    public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request)
+            throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            request.serialize(oos);
+
+            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+            if (buffer.capacity() < requestSize) {
+                buffer = ByteBuffer.allocate(requestSize);
+            } else {
+                buffer.clear();
+            }
+            buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
+            buffer.putInt(oos.size());
+            buffer.put(outputStream.toByteArray());
+            buffer.flip();
+            return buffer;
         }
-        buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
     }
 
     public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
deleted file mode 100644
index 9915c83..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.management;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-
-public class ReplicaEventNotifier implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(ReplicaEventNotifier.class.getName());
-
-    final int WAIT_TIME = 2000;
-    final Set<Replica> notifyReplicaNodes;
-
-    int notificationTimeOut;
-
-    final ReplicaEvent event;
-    final AsterixReplicationProperties asterixReplicationProperties;
-
-    public ReplicaEventNotifier(ReplicaEvent event, AsterixReplicationProperties asterixReplicationProperties) {
-        this.event = event;
-        this.asterixReplicationProperties = asterixReplicationProperties;
-        notificationTimeOut = asterixReplicationProperties.getReplicationTimeOut();
-        notifyReplicaNodes = asterixReplicationProperties.getRemoteReplicas(event.getReplica().getId());
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("ReplicaEventNotifier Thread");
-
-        if (notifyReplicaNodes == null) {
-            return;
-        }
-
-        ByteBuffer buffer = null;
-        try {
-            buffer = ReplicationProtocol.writeReplicaEventRequest(event);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        for (Replica replica : notifyReplicaNodes) {
-            long startTime = System.currentTimeMillis();
-            InetSocketAddress replicaAddress = replica.getAddress(asterixReplicationProperties);
-            SocketChannel connection = null;
-
-            while (true) {
-                try {
-                    connection = SocketChannel.open();
-                    connection.configureBlocking(true);
-                    connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                    //send replica event
-                    connection.write(buffer);
-                    //send goodbye
-                    connection.write(ReplicationProtocol.getGoodbyeBuffer());
-                    break;
-                } catch (IOException | UnresolvedAddressException e) {
-                    try {
-                        Thread.sleep(WAIT_TIME);
-                    } catch (InterruptedException e1) {
-                        //ignore
-                    }
-
-                    //check if connection to replica timed out
-                    if (((System.currentTimeMillis() - startTime) / 1000) >= notificationTimeOut) {
-                        LOGGER.log(Level.WARNING, "Could not send ReplicaEvent to " + replica);
-                        break;
-                    }
-                } finally {
-                    if (connection.isOpen()) {
-                        try {
-                            connection.close();
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                    buffer.position(0);
-                }
-            }
-        }
-    }
-}