You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/02/18 10:54:30 UTC
[4/6] incubator-asterixdb git commit: Asterix NCs Failback Support
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
new file mode 100644
index 0000000..bd01d99
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.5.cstate.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
new file mode 100644
index 0000000..b09c3d3
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.6.query.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
new file mode 100644
index 0000000..56a88a2
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.7.update.aql
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
+use dataverse TinySocial;
+
+/* insert ids 11-20 */
+insert into dataset TinySocial.FacebookUsersInMemory {"id":11,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":12,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":13,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":14,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":15,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":16,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":17,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":18,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":19,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
+
+insert into dataset TinySocial.FacebookUsersInMemory {"id":20,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
new file mode 100644
index 0000000..67b492c
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.8.vmgx.aql
@@ -0,0 +1 @@
+startnode -n asterix -nodes nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
new file mode 100644
index 0000000..1746da6
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.9.sleep.aql
@@ -0,0 +1 @@
+10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
index ae14ad0..94ecc27 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
@@ -29,4 +29,4 @@
use dataverse TinySocial;
load dataset FacebookUsers using localfs
-(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
deleted file mode 100644
index 5695ed7..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc2 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vscript.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
index 8087689..d97f786 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
@@ -29,6 +29,6 @@
use dataverse TinySocial;
load dataset FacebookUsers using localfs
-(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
deleted file mode 100644
index 5695ed7..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc2 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vscript.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
deleted file mode 100644
index 5eec164..0000000
--- a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
+++ /dev/null
@@ -1 +0,0 @@
-nc1 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vscript.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
new file mode 100644
index 0000000..61322c9
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
@@ -0,0 +1 @@
+{"State":"ACTIVE","Metadata_Node":"asterix_nc1","partition_0":"asterix_nc1","partition_1":"asterix_nc1","partition_2":"asterix_nc2","partition_3":"asterix_nc2"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
new file mode 100644
index 0000000..587a97a
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
@@ -0,0 +1 @@
+{"State":"ACTIVE","Metadata_Node":"asterix_nc2","partition_0":"asterix_nc2","partition_1":"asterix_nc2","partition_2":"asterix_nc2","partition_3":"asterix_nc2"}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
new file mode 100644
index 0000000..2edeafb
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.11.adm
@@ -0,0 +1 @@
+20
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
new file mode 100644
index 0000000..9a03714
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.query.6.adm
@@ -0,0 +1 @@
+10
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
index f033086..36c3992 100644
--- a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -17,21 +17,28 @@
! under the License.
!-->
<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
- <test-group name="failover">
- <test-case FilePath="failover">
- <compilation-unit name="bulkload">
- <output-dir compare="Text">bulkload</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="failover">
- <compilation-unit name="mem_component_recovery">
- <output-dir compare="Text">mem_component_recovery</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="failover">
- <compilation-unit name="metadata_node">
- <output-dir compare="Text">metadata_node</output-dir>
- </compilation-unit>
- </test-case>
- </test-group>
+ <test-group name="failover">
+ <test-case FilePath="failover">
+ <compilation-unit name="bulkload">
+ <output-dir compare="Text">bulkload</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failover">
+ <compilation-unit name="mem_component_recovery">
+ <output-dir compare="Text">mem_component_recovery</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failover">
+ <compilation-unit name="metadata_node">
+ <output-dir compare="Text">metadata_node</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="failback">
+ <test-case FilePath="failback">
+ <compilation-unit name="node_failback">
+ <output-dir compare="Text">node_failback</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 4e6a3df..088a85b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -28,6 +28,7 @@ import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -113,6 +114,7 @@ public class MetadataNode implements IMetadataNode {
private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
+ private int metadataStoragePartition;
public static final MetadataNode INSTANCE = new MetadataNode();
@@ -123,6 +125,8 @@ public class MetadataNode implements IMetadataNode {
public void initialize(IAsterixAppRuntimeContext runtimeContext) {
this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
this.datasetLifecycleManager = runtimeContext.getDatasetLifecycleManager();
+ this.metadataStoragePartition = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties()
+ .getMetadataPartition().getPartitionId();
}
@Override
@@ -305,11 +309,11 @@ public class MetadataNode implements IMetadataNode {
if (metadataIndex.isPrimaryIndex()) {
return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+ transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
} else {
return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
+ transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
}
}
@@ -641,8 +645,7 @@ public class MetadataNode implements IMetadataNode {
}
}
- private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName)
- throws MetadataException, RemoteException {
+ private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName) throws MetadataException {
try {
ITupleReference searchKey = createTuple(dataverseName);
DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
@@ -673,7 +676,7 @@ public class MetadataNode implements IMetadataNode {
}
}
- public List<Dataset> getAllDatasets(JobId jobId) throws MetadataException, RemoteException {
+ public List<Dataset> getAllDatasets(JobId jobId) throws MetadataException {
try {
ITupleReference searchKey = null;
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false);
@@ -686,7 +689,7 @@ public class MetadataNode implements IMetadataNode {
}
}
- public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException, RemoteException {
+ public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException {
try {
ITupleReference searchKey = null;
DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
@@ -699,8 +702,7 @@ public class MetadataNode implements IMetadataNode {
}
}
- private void confirmDataverseCanBeDeleted(JobId jobId, String dataverseName)
- throws MetadataException, RemoteException {
+ private void confirmDataverseCanBeDeleted(JobId jobId, String dataverseName) throws MetadataException {
//If a dataset from a DIFFERENT dataverse
//uses a type from this dataverse
//throw an error
@@ -717,13 +719,13 @@ public class MetadataNode implements IMetadataNode {
}
private void confirmDatatypeIsUnused(JobId jobId, String dataverseName, String datatypeName)
- throws MetadataException, RemoteException {
+ throws MetadataException {
confirmDatatypeIsUnusedByDatatypes(jobId, dataverseName, datatypeName);
confirmDatatypeIsUnusedByDatasets(jobId, dataverseName, datatypeName);
}
private void confirmDatatypeIsUnusedByDatasets(JobId jobId, String dataverseName, String datatypeName)
- throws MetadataException, RemoteException {
+ throws MetadataException {
//If any dataset uses this type, throw an error
List<Dataset> datasets = getAllDatasets(jobId);
for (Dataset set : datasets) {
@@ -735,7 +737,7 @@ public class MetadataNode implements IMetadataNode {
}
private void confirmDatatypeIsUnusedByDatatypes(JobId jobId, String dataverseName, String datatypeName)
- throws MetadataException, RemoteException {
+ throws MetadataException {
//If any datatype uses this type, throw an error
//TODO: Currently this loads all types into memory. This will need to be fixed for large numbers of types
List<Datatype> datatypes = getAllDatatypes(jobId);
@@ -768,7 +770,7 @@ public class MetadataNode implements IMetadataNode {
}
public List<String> getDatasetNamesPartitionedOnThisNodeGroup(JobId jobId, String nodegroup)
- throws MetadataException, RemoteException {
+ throws MetadataException {
//this needs to scan the datasets and return the datasets that use this nodegroup
List<String> nodeGroupDatasets = new ArrayList<String>();
List<Dataset> datasets = getAllDatasets(jobId);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 2744630..f79385c 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,14 +38,24 @@ import javax.xml.bind.Unmarshaller;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.common.messaging.ReplicaEventMessage;
import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.NodeFailbackPlan;
+import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.json.JSONException;
+import org.json.JSONObject;
/**
* A holder class for properties related to the Asterix cluster.
@@ -57,15 +69,16 @@ public class AsterixClusterProperties {
*/
private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
-
public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
+ private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
private static final String IO_DEVICES = "iodevices";
private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
- private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
+ private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<String, Map<String, String>>();
private final Cluster cluster;
+ private ClusterState state = ClusterState.UNUSABLE;
private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
@@ -75,10 +88,14 @@ public class AsterixClusterProperties {
private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
- private long takeoverRequestId = 0;
+ private long clusterRequestId = 0;
private String currentMetadataNode = null;
- private boolean isMetadataNodeActive = false;
+ private boolean metadataNodeActive = false;
private boolean autoFailover = false;
+ private boolean replicationEnabled = false;
+ private Set<String> failedNodes = new HashSet<>();
+ private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
+ private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -99,43 +116,73 @@ public class AsterixClusterProperties {
node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
- if (isAutoFailoverEnabled()) {
- autoFailover = cluster.getDataReplication().isAutoFailover();
- }
+ replicationEnabled = isReplicationEnabled();
+ autoFailover = isAutoFailoverEnabled();
if (autoFailover) {
pendingTakeoverRequests = new HashMap<>();
+ pendingProcessingFailbackPlans = new LinkedList<>();
+ planId2FailbackPlanMap = new HashMap<>();
}
}
}
}
- private ClusterState state = ClusterState.UNUSABLE;
-
public synchronized void removeNCConfiguration(String nodeId) {
- updateNodePartitions(nodeId, false);
- ncConfiguration.remove(nodeId);
- if (nodeId.equals(currentMetadataNode)) {
- isMetadataNodeActive = false;
- LOGGER.info("Metadata node is now inactive");
- }
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " + nodeId);
}
- if (autoFailover) {
- requestPartitionsTakeover(nodeId);
+ activeNcConfiguration.remove(nodeId);
+
+ //if this node was waiting for failback and failed before it completed
+ if (failedNodes.contains(nodeId)) {
+ if (autoFailover) {
+ notifyFailbackPlansNodeFailure(nodeId);
+ revertFailedFailbackPlanEffects();
+ }
+ } else {
+ //an active node failed
+ failedNodes.add(nodeId);
+ if (nodeId.equals(currentMetadataNode)) {
+ metadataNodeActive = false;
+ LOGGER.info("Metadata node is now inactive");
+ }
+ updateNodePartitions(nodeId, false);
+ if (replicationEnabled) {
+ notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+ if (autoFailover) {
+ notifyFailbackPlansNodeFailure(nodeId);
+ requestPartitionsTakeover(nodeId);
+ }
+ }
}
}
public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
- ncConfiguration.put(nodeId, configuration);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registering configuration parameters for node id " + nodeId);
+ }
+ activeNcConfiguration.put(nodeId, configuration);
+
+ //a node trying to come back after failure
+ if (failedNodes.contains(nodeId)) {
+ if (autoFailover) {
+ prepareFailbackPlan(nodeId);
+ return;
+ } else {
+ //a node completed local or remote recovery and rejoined
+ failedNodes.remove(nodeId);
+ if (replicationEnabled) {
+ //notify other replica to reconnect to this node
+ notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+ }
+ }
+ }
+
if (nodeId.equals(currentMetadataNode)) {
- isMetadataNodeActive = true;
+ metadataNodeActive = true;
LOGGER.info("Metadata node is now active");
}
updateNodePartitions(nodeId, true);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Registering configuration parameters for node id " + nodeId);
- }
}
private synchronized void updateNodePartitions(String nodeId, boolean added) {
@@ -163,11 +210,17 @@ public class AsterixClusterProperties {
}
}
//if all storage partitions are active as well as the metadata node, then the cluster is active
- if (isMetadataNodeActive) {
+ if (metadataNodeActive) {
state = ClusterState.ACTIVE;
LOGGER.info("Cluster is now ACTIVE");
//start global recovery
AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
+ if (autoFailover) {
+ //if there are any pending failback requests, process them
+ if (pendingProcessingFailbackPlans.size() > 0) {
+ processPendingFailbackPlans();
+ }
+ }
} else {
requestMetadataNodeTakeover();
}
@@ -196,7 +249,7 @@ public class AsterixClusterProperties {
* if it does not correspond to the set of registered Node Controllers.
*/
public synchronized String[] getIODevices(String nodeId) {
- Map<String, String> ncConfig = ncConfiguration.get(nodeId);
+ Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
if (ncConfig == null) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -222,7 +275,7 @@ public class AsterixClusterProperties {
public synchronized Set<String> getParticipantNodes() {
Set<String> participantNodes = new HashSet<String>();
- for (String pNode : ncConfiguration.keySet()) {
+ for (String pNode : activeNcConfiguration.keySet()) {
participantNodes.add(pNode);
}
return participantNodes;
@@ -254,12 +307,12 @@ public class AsterixClusterProperties {
this.globalRecoveryCompleted = globalRecoveryCompleted;
}
- public static boolean isClusterActive() {
- if (AsterixClusterProperties.INSTANCE.getCluster() == null) {
+ public boolean isClusterActive() {
+ if (cluster == null) {
// this is a virtual cluster
return true;
}
- return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
+ return state == ClusterState.ACTIVE;
}
public static int getNumberOfNodes() {
@@ -279,8 +332,8 @@ public class AsterixClusterProperties {
public synchronized ClusterPartition[] getClusterPartitons() {
ArrayList<ClusterPartition> partitons = new ArrayList<>();
- for (ClusterPartition cluster : clusterPartitions.values()) {
- partitons.add(cluster);
+ for (ClusterPartition partition : clusterPartitions.values()) {
+ partitons.add(partition);
}
return partitons.toArray(new ClusterPartition[] {});
}
@@ -301,44 +354,53 @@ public class AsterixClusterProperties {
//collect the partitions of the failed NC
List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
- for (ClusterPartition partition : lostPartitions) {
- //find replicas for this partitions
- Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
- //find a replica that is still active
- for (String replica : partitionReplicas) {
- //TODO (mhubail) currently this assigns the partition to the first found active replica.
- //It needs to be modified to consider load balancing.
- if (ncConfiguration.containsKey(replica)) {
- if (!partitionRecoveryPlan.containsKey(replica)) {
- List<Integer> replicaPartitions = new ArrayList<>();
- replicaPartitions.add(partition.getPartitionId());
- partitionRecoveryPlan.put(replica, replicaPartitions);
- } else {
- partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ if (lostPartitions.size() > 0) {
+ for (ClusterPartition partition : lostPartitions) {
+ //find replicas for this partitions
+ Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+ //find a replica that is still active
+ for (String replica : partitionReplicas) {
+ //TODO (mhubail) currently this assigns the partition to the first found active replica.
+ //It needs to be modified to consider load balancing.
+ if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+ if (!partitionRecoveryPlan.containsKey(replica)) {
+ List<Integer> replicaPartitions = new ArrayList<>();
+ replicaPartitions.add(partition.getPartitionId());
+ partitionRecoveryPlan.put(replica, replicaPartitions);
+ } else {
+ partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ }
}
}
}
- }
- ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
- .getCCApplicationContext().getMessageBroker();
- //For each replica, send a request to takeover the assigned partitions
- for (String replica : partitionRecoveryPlan.keySet()) {
- Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
- long requestId = takeoverRequestId++;
- TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, replica,
- failedNodeId, partitionsToTakeover);
- pendingTakeoverRequests.put(requestId, takeoverRequest);
- try {
- messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
- } catch (Exception e) {
- /**
- * if we fail to send the request, it means the NC we tried to send the request to
- * has failed. When the failure notification arrives, we will send any pending request
- * that belongs to the failed NC to a different active replica.
- */
- LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
- e.printStackTrace();
+ if (partitionRecoveryPlan.size() == 0) {
+ //no active replicas were found for the failed node
+ LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
+ return;
+ } else {
+ LOGGER.info("Partitions to recover: " + lostPartitions);
+ }
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ //For each replica, send a request to takeover the assigned partitions
+ for (String replica : partitionRecoveryPlan.keySet()) {
+ Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+ long requestId = clusterRequestId++;
+ TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
+ replica, partitionsToTakeover);
+ pendingTakeoverRequests.put(requestId, takeoverRequest);
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we tried to send the request to
+ * has failed. When the failure notification arrives, we will send any pending request
+ * that belongs to the failed NC to a different active replica.
+ */
+ LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
+ e.printStackTrace();
+ }
}
}
}
@@ -368,7 +430,6 @@ public class AsterixClusterProperties {
for (Long requestId : failedTakeoverRequests) {
pendingTakeoverRequests.remove(requestId);
}
-
return nodePartitions;
}
@@ -406,19 +467,223 @@ public class AsterixClusterProperties {
public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
currentMetadataNode = reponse.getNodeId();
- isMetadataNodeActive = true;
+ metadataNodeActive = true;
LOGGER.info("Current metadata node: " + currentMetadataNode);
updateClusterState();
}
- public synchronized String getCurrentMetadataNode() {
- return currentMetadataNode;
+ private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+ NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+ pendingProcessingFailbackPlans.add(plan);
+ planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+ //get all partitions this node requires to resync
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ .getReplicationProperties();
+ Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
+ for (String replicaId : nodeReplicas) {
+ ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
+ for (ClusterPartition partition : nodePartitions) {
+ plan.addParticipant(partition.getActiveNodeId());
+ /**
+ * if the partition original node is the returning node,
+ * add it to the list of the partitions which will be failed back
+ */
+ if (partition.getNodeId().equals(failingBackNodeId)) {
+ plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
+ }
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Prepared Failback plan: " + plan.toString());
+ }
+
+ processPendingFailbackPlans();
}
- public boolean isAutoFailoverEnabled() {
- if (cluster != null && cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
- return cluster.getDataReplication().isAutoFailover();
+ private synchronized void processPendingFailbackPlans() {
+ /**
+ * if the cluster state is not ACTIVE, then failbacks should not be processed
+ * since some partitions are not active
+ */
+ if (state == ClusterState.ACTIVE) {
+ while (!pendingProcessingFailbackPlans.isEmpty()) {
+ //take the first pending failback plan
+ NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+ /**
+ * A plan at this stage will be in one of two states:
+ * 1. PREPARING -> the participants were selected but we haven't sent any request.
+ * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
+ */
+ if (plan.getState() == FailbackPlanState.PREPARING) {
+ //set the partitions that will be failed back as inactive
+ String failbackNode = plan.getNodeId();
+ for (Integer partitionId : plan.getPartitionsToFailback()) {
+ ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
+ clusterPartition.setActive(false);
+ //partition expected to be returned to the failing back node
+ clusterPartition.setActiveNodeId(failbackNode);
+ }
+
+ /**
+ * if the returning node is the original metadata node,
+ * then metadata node will change after the failback completes
+ */
+ String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties()
+ .getMetadataNodeName();
+ if (originalMetadataNode.equals(failbackNode)) {
+ plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+ currentMetadataNode = "";
+ metadataNodeActive = false;
+ }
+
+ //force new jobs to wait
+ state = ClusterState.REBALANCING;
+
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+ Set<PreparePartitionsFailbackRequestMessage> planFailbackRequests = plan.getPlanFailbackRequests();
+ for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) {
+ try {
+ messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+ plan.addPendingRequest(request);
+ } catch (Exception e) {
+ LOGGER.warning("Failed to send failback request to: " + request.getNodeID());
+ e.printStackTrace();
+ plan.notifyNodeFailure(request.getNodeID());
+ revertFailedFailbackPlanEffects();
+ break;
+ }
+ }
+ /**
+ * wait until the current plan is completed before processing the next plan.
+ * when the current one completes or is reverted, the cluster state will be
+ * ACTIVE again, and the next failback plan (if any) will be processed.
+ */
+ break;
+ } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ //this plan failed before sending any requests -> nothing to rollback
+ planId2FailbackPlanMap.remove(plan.getPlanId());
+ }
+ }
+ }
+ }
+
+ public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
+ NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+ plan.markRequestCompleted(msg.getRequestId());
+ /**
+ * A plan at this stage will be in one of three states:
+ * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
+ * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
+ * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
+ */
+ if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+ CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
+
+ //send complete resync and takeover partitions to the failing back node
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
+ } catch (Exception e) {
+ LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId());
+ e.printStackTrace();
+ notifyFailbackPlansNodeFailure(request.getNodeId());
+ revertFailedFailbackPlanEffects();
+ }
+ } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ revertFailedFailbackPlanEffects();
+ }
+ }
+
+ public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+ /**
+ * the failback plan completed successfully:
+ * Remove all references to it.
+ * Remove the the failing back node from the failed nodes list.
+ * Notify its replicas to reconnect to it.
+ * Set the failing back node partitions as active.
+ */
+ NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId());
+ String nodeId = plan.getNodeId();
+ failedNodes.remove(nodeId);
+ //notify impacted replicas they can reconnect to this node
+ notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+ updateNodePartitions(nodeId, true);
+ }
+
+ private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ .getReplicationProperties();
+ Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
+ String nodeIdAddress = "";
+ //in case the node joined with a new IP address, we need to send it to the other replicas
+ if (event == ClusterEventType.NODE_JOIN) {
+ nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+ }
+
+ ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ for (String replica : remoteReplicas) {
+ //if the remote replica is alive, send the event
+ if (activeNcConfiguration.containsKey(replica)) {
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, replica);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private synchronized void revertFailedFailbackPlanEffects() {
+ Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ //TODO if the failing back node is still active, notify it to construct a new plan for it
+ iterator.remove();
+
+ //reassign the partitions that were supposed to be failed back to an active replica
+ requestPartitionsTakeover(plan.getNodeId());
+ }
+ }
+ }
+
+ private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+ Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ plan.notifyNodeFailure(nodeId);
+ }
+ }
+
+ public synchronized boolean isMetadataNodeActive() {
+ return metadataNodeActive;
+ }
+
+ public boolean isReplicationEnabled() {
+ if (cluster != null && cluster.getDataReplication() != null) {
+ return cluster.getDataReplication().isEnabled();
}
return false;
}
+
+ public boolean isAutoFailoverEnabled() {
+ return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+ }
+
+ public synchronized JSONObject getClusterStateDescription() throws JSONException {
+ JSONObject stateDescription = new JSONObject();
+ stateDescription.put("State", state.name());
+ stateDescription.put("Metadata_Node", currentMetadataNode);
+ for (ClusterPartition partition : clusterPartitions.values()) {
+ stateDescription.put("partition_" + partition.getPartitionId(), partition.getActiveNodeId());
+ }
+ return stateDescription;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
index 647a6a3..7502737 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
@@ -26,10 +26,12 @@ import java.util.HashSet;
import java.util.Set;
public class ReplicaFilesRequest {
- Set<String> replicaIds;
+ private final Set<String> replicaIds;
+ private final Set<String> existingFiles;
- public ReplicaFilesRequest(Set<String> replicaIds) {
+ public ReplicaFilesRequest(Set<String> replicaIds, Set<String> existingFiles) {
this.replicaIds = replicaIds;
+ this.existingFiles = existingFiles;
}
public void serialize(OutputStream out) throws IOException {
@@ -38,6 +40,10 @@ public class ReplicaFilesRequest {
for (String replicaId : replicaIds) {
dos.writeUTF(replicaId);
}
+ dos.writeInt(existingFiles.size());
+ for (String fileName : existingFiles) {
+ dos.writeUTF(fileName);
+ }
}
public static ReplicaFilesRequest create(DataInput input) throws IOException {
@@ -46,15 +52,19 @@ public class ReplicaFilesRequest {
for (int i = 0; i < size; i++) {
replicaIds.add(input.readUTF());
}
-
- return new ReplicaFilesRequest(replicaIds);
+ int filesCount = input.readInt();
+ Set<String> existingFiles = new HashSet<String>(filesCount);
+ for (int i = 0; i < filesCount; i++) {
+ existingFiles.add(input.readUTF());
+ }
+ return new ReplicaFilesRequest(replicaIds, existingFiles);
}
public Set<String> getReplicaIds() {
return replicaIds;
}
- public void setReplicaIds(Set<String> replicaIds) {
- this.replicaIds = replicaIds;
+ public Set<String> getExistingFiles() {
+ return existingFiles;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 790df66..d2380c1 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.management.NetworkingUtil;
@@ -52,7 +51,6 @@ public class ReplicationProtocol {
* GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
* GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
* GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
- * UPDATE_REPLICA: used to update replica info such as IP Address change.
* GOODBYE: used to notify replicas that the replication request has been completed
* REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
* LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
@@ -67,7 +65,6 @@ public class ReplicationProtocol {
GET_REPLICA_LOGS,
GET_REPLICA_MAX_LSN,
GET_REPLICA_MIN_LSN,
- UPDATE_REPLICA,
GOODBYE,
REPLICA_EVENT,
LSM_COMPONENT_PROPERTIES,
@@ -115,8 +112,7 @@ public class ReplicationProtocol {
//read replication request type
NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
- ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
- .getInt()];
+ ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer.getInt()];
return requestType;
}
@@ -215,21 +211,6 @@ public class ReplicationProtocol {
return requestBuffer;
}
- public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
-
- oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
- replica.writeFields(oos);
- oos.close();
-
- ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
- buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- return buffer;
- }
-
public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream oos = new DataOutputStream(outputStream);
@@ -244,12 +225,6 @@ public class ReplicationProtocol {
return buffer;
}
- public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return Replica.create(dis);
- }
-
public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
DataInputStream dis = new DataInputStream(bais);
@@ -257,22 +232,24 @@ public class ReplicationProtocol {
return ReplicaEvent.create(dis);
}
- public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
+ public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request)
+ throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ request.serialize(oos);
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
}
- buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
}
public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
deleted file mode 100644
index 9915c83..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.management;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-
-public class ReplicaEventNotifier implements Runnable {
-
- private static final Logger LOGGER = Logger.getLogger(ReplicaEventNotifier.class.getName());
-
- final int WAIT_TIME = 2000;
- final Set<Replica> notifyReplicaNodes;
-
- int notificationTimeOut;
-
- final ReplicaEvent event;
- final AsterixReplicationProperties asterixReplicationProperties;
-
- public ReplicaEventNotifier(ReplicaEvent event, AsterixReplicationProperties asterixReplicationProperties) {
- this.event = event;
- this.asterixReplicationProperties = asterixReplicationProperties;
- notificationTimeOut = asterixReplicationProperties.getReplicationTimeOut();
- notifyReplicaNodes = asterixReplicationProperties.getRemoteReplicas(event.getReplica().getId());
- }
-
- @Override
- public void run() {
- Thread.currentThread().setName("ReplicaEventNotifier Thread");
-
- if (notifyReplicaNodes == null) {
- return;
- }
-
- ByteBuffer buffer = null;
- try {
- buffer = ReplicationProtocol.writeReplicaEventRequest(event);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- for (Replica replica : notifyReplicaNodes) {
- long startTime = System.currentTimeMillis();
- InetSocketAddress replicaAddress = replica.getAddress(asterixReplicationProperties);
- SocketChannel connection = null;
-
- while (true) {
- try {
- connection = SocketChannel.open();
- connection.configureBlocking(true);
- connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
- //send replica event
- connection.write(buffer);
- //send goodbye
- connection.write(ReplicationProtocol.getGoodbyeBuffer());
- break;
- } catch (IOException | UnresolvedAddressException e) {
- try {
- Thread.sleep(WAIT_TIME);
- } catch (InterruptedException e1) {
- //ignore
- }
-
- //check if connection to replica timed out
- if (((System.currentTimeMillis() - startTime) / 1000) >= notificationTimeOut) {
- LOGGER.log(Level.WARNING, "Could not send ReplicaEvent to " + replica);
- break;
- }
- } finally {
- if (connection.isOpen()) {
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- buffer.position(0);
- }
- }
- }
- }
-}