You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/02/18 10:54:31 UTC
[5/6] incubator-asterixdb git commit: Asterix NCs Failback Support
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..7909a35
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Set<Integer> partitions;
+ private boolean releaseMetadataNode = false;
+ private final String nodeID;
+
+ public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+ super(planId, requestId);
+ this.nodeID = nodeId;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_REQUEST;
+ }
+
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
+ public boolean isReleaseMetadataNode() {
+ return releaseMetadataNode;
+ }
+
+ public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+ this.releaseMetadataNode = releaseMetadataNode;
+ }
+
+ public String getNodeID() {
+ return nodeID;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Plan ID: " + planId);
+ sb.append(" Partitions: " + partitions);
+ sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
new file mode 100644
index 0000000..6b058c7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Set<Integer> partitions;
+
+ public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
+ super(planId, requestId);
+ this.partitions = partitions;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_RESPONSE;
+ }
+
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
new file mode 100644
index 0000000..28fd36f
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+
+public class ReplicaEventMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final ClusterEventType event;
+ private final String nodeIPAddress;
+
+ public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
+ this.nodeId = nodeId;
+ this.nodeIPAddress = nodeIPAddress;
+ this.event = event;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.REPLICA_EVENT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public ClusterEventType getEvent() {
+ return event;
+ }
+
+ public String getNodeIPAddress() {
+ return nodeIPAddress;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
index abfa7d2..ad5234b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -22,15 +22,12 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
private static final long serialVersionUID = 1L;
private final Integer[] partitions;
- private final String failedNode;
private final long requestId;
private final String nodeId;
- public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
- Integer[] partitionsToTakeover) {
+ public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
this.requestId = requestId;
this.nodeId = nodeId;
- this.failedNode = failedNode;
this.partitions = partitionsToTakeover;
}
@@ -47,10 +44,6 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
return requestId;
}
- public String getFailedNode() {
- return failedNode;
- }
-
public String getNodeId() {
return nodeId;
}
@@ -60,7 +53,6 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
StringBuilder sb = new StringBuilder();
sb.append("Request ID: " + requestId);
sb.append(" Node ID: " + nodeId);
- sb.append(" Failed Node: " + failedNode);
sb.append(" Partitions: ");
for (Integer partitionId : partitions) {
sb.append(partitionId + ",");
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 57a0dae..5d2e263 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -30,7 +30,12 @@ public interface IApplicationMessage extends IMessage {
TAKEOVER_PARTITIONS_REQUEST,
TAKEOVER_PARTITIONS_RESPONSE,
TAKEOVER_METADATA_NODE_REQUEST,
- TAKEOVER_METADATA_NODE_RESPONSE
+ TAKEOVER_METADATA_NODE_RESPONSE,
+ PREPARE_PARTITIONS_FAILBACK_REQUEST,
+ PREPARE_PARTITIONS_FAILBACK_RESPONSE,
+ COMPLETE_FAILBACK_REQUEST,
+ COMPLETE_FAILBACK_RESPONSE,
+ REPLICA_EVENT
}
public abstract ApplicationMessageType getMessageType();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index ecc9494..6d5918f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -36,6 +36,17 @@ public interface IRemoteRecoveryManager {
* @throws IOException
* @throws ACIDException
*/
- public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+ public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException;
+ /**
+ * Construct failback plan and requests LSM disk components from active remote replicas.
+ */
+ public void startFailbackProcess();
+
+ /**
+ * Requests the remaining LSM disk components files from active remote replicas.
+ *
+ * @throws IOException
+ */
+ public void completeFailbackProcess() throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index f13d300..e22fafc 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -23,14 +23,8 @@ import java.util.Set;
public interface IReplicaResourcesManager {
/**
- * @param remoteNodes
- * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
- */
- public long getMinRemoteLSN(Set<String> remoteNodes);
-
- /**
* @param partitions
* @return the minimum LSN of all indexes that belong to {@code partitions}.
*/
- public long getPartitionsMinLSN(Integer[] partitions);
+ public long getPartitionsMinLSN(Set<Integer> partitions);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 76f8767..3fc2af0 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.common.replication;
+import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Set;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -54,12 +54,13 @@ public interface IReplicationManager extends IIOReplicationManager {
* Get logs that belong to those replicas.
* @param fromLSN
* Low water mark for logs to be requested.
- * @return The logs received that belong to the local node.
+ * @param recoveryLogsFile
+ * a temporary file to store the logs required for recovery
* @throws IOException
* @throws ACIDException
*/
- public ArrayList<ILogRecord> requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover,
- long fromLSN) throws IOException, ACIDException;
+ public void requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover, long fromLSN,
+ File recoveryLogsFile) throws IOException, ACIDException;
/**
* Requests LSM components files from a remote replica.
@@ -68,9 +69,12 @@ public interface IReplicationManager extends IIOReplicationManager {
* The replica id to send the request to.
* @param replicasDataToRecover
* Get files that belong to those replicas.
+ * @param existingFiles
+ * a list of already existing files on the requester
* @throws IOException
*/
- public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover) throws IOException;
+ public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover,
+ Set<String> existingFiles) throws IOException;
/**
* Requests current maximum LSN from remote replicas.
@@ -83,13 +87,6 @@ public interface IReplicationManager extends IIOReplicationManager {
public long getMaxRemoteLSN(Set<String> remoteReplicaIds) throws IOException;
/**
- * Sends the IP address of the local replica to all remote replicas.
- *
- * @throws IOException
- */
- public void broadcastNewIPAddress() throws IOException;
-
- /**
* @return The number of remote replicas that are in ACTIVE state.
*/
public int getActiveReplicasCount();
@@ -146,5 +143,4 @@ public interface IReplicationManager extends IIOReplicationManager {
* @throws IOException
*/
public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
new file mode 100644
index 0000000..0591644
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+
+public class NodeFailbackPlan {
+
+ public enum FailbackPlanState {
+ /**
+ * Initial state while selecting the nodes that will participate
+ * in the node failback plan.
+ */
+ PREPARING,
+ /**
+ * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
+ * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
+ * a response is expected and need to wait for it.
+ */
+ PENDING_PARTICIPANT_REPONSE,
+ /**
+ * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
+ * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
+ * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
+ */
+ PENDING_COMPLETION,
+ /**
+ * if any of the participants fail or the failing back node itself fails during
+ * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
+ * the state is changed to FAILED.
+ */
+ FAILED,
+ /**
+ * if the state is FAILED, and all pending responses (if any) have been received,
+ * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
+ * the effects of this plan (if any).
+ */
+ PENDING_ROLLBACK
+ }
+
+ private static long planIdGenerator = 0;
+ private long planId;
+ private final String nodeId;
+ private final Set<String> participants;
+ private final Map<Integer, String> partition2nodeMap;
+ private String nodeToReleaseMetadataManager;
+ private int requestId;
+ private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
+ private FailbackPlanState state;
+
+ public static NodeFailbackPlan createPlan(String nodeId) {
+ return new NodeFailbackPlan(planIdGenerator++, nodeId);
+ }
+
+ private NodeFailbackPlan(long planId, String nodeId) {
+ this.planId = planId;
+ this.nodeId = nodeId;
+ participants = new HashSet<>();
+ partition2nodeMap = new HashMap<>();
+ pendingRequests = new HashMap<>();
+ state = FailbackPlanState.PREPARING;
+ }
+
+ public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
+ partition2nodeMap.put(partitionId, currentActiveNode);
+ }
+
+ public synchronized void addParticipant(String nodeId) {
+ participants.add(nodeId);
+ }
+
+ public synchronized void notifyNodeFailure(String failedNode) {
+ if (participants.contains(failedNode)) {
+ if (state == FailbackPlanState.PREPARING) {
+ state = FailbackPlanState.FAILED;
+ } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
+ /**
+ * if there is any pending request from this failed node,
+ * it should be marked as completed and the plan should be marked as failed
+ */
+ Set<Integer> failedRequests = new HashSet<>();
+ for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
+ if (request.getNodeID().equals(failedNode)) {
+ failedRequests.add(request.getRequestId());
+ }
+ }
+
+ if (failedRequests.size() > 0) {
+ state = FailbackPlanState.FAILED;
+ for (Integer failedRequestId : failedRequests) {
+ markRequestCompleted(failedRequestId);
+ }
+ }
+ }
+ } else if (nodeId.equals(failedNode)) {
+ //if the failing back node is the failed node itself
+ state = FailbackPlanState.FAILED;
+ updateState();
+ }
+ }
+
+ public synchronized Set<Integer> getPartitionsToFailback() {
+ return new HashSet<>(partition2nodeMap.keySet());
+ }
+
+ public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
+ //if this is the first request
+ if (pendingRequests.size() == 0) {
+ state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
+ }
+ pendingRequests.put(msg.getRequestId(), msg);
+ }
+
+ public synchronized void markRequestCompleted(int requestId) {
+ pendingRequests.remove(requestId);
+ updateState();
+ }
+
+ private void updateState() {
+ if (pendingRequests.size() == 0) {
+ switch (state) {
+ case PREPARING:
+ case FAILED:
+ state = FailbackPlanState.PENDING_ROLLBACK;
+ break;
+ case PENDING_PARTICIPANT_REPONSE:
+ state = FailbackPlanState.PENDING_COMPLETION;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
+ Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
+ /**
+ * for each participant, construct a request with the partitions
+ * that will be failed back or flushed.
+ */
+ for (String participant : participants) {
+ Set<Integer> partitionToPrepareForFailback = new HashSet<>();
+ for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) {
+ if (entry.getValue().equals(participant)) {
+ partitionToPrepareForFailback.add(entry.getKey());
+ }
+ }
+ PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
+ requestId++, participant, partitionToPrepareForFailback);
+ if (participant.equals(nodeToReleaseMetadataManager)) {
+ msg.setReleaseMetadataNode(true);
+ }
+ node2Partitions.add(msg);
+ }
+ return node2Partitions;
+ }
+
+ public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
+ return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public long getPlanId() {
+ return planId;
+ }
+
+ public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
+ this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
+ }
+
+ public synchronized FailbackPlanState getState() {
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Plan ID: " + planId);
+ sb.append(" Failing back node: " + nodeId);
+ sb.append(" Participants: " + participants);
+ sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
index 0797a02..ae02ca9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
@@ -23,23 +23,14 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-public class ReplicaEvent {
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
- /*
- * FAIL: remote replica failed.
- * JOIN: remote replica rejoined the cluster.
- * SHUTDOWN: remote replica is shutting down normally
- * */
- public enum ReplicaEventType {
- FAIL,
- JOIN,
- SHUTDOWN
- }
+public class ReplicaEvent {
Replica replica;
- ReplicaEventType eventType;
+ ClusterEventType eventType;
- public ReplicaEvent(Replica replica, ReplicaEventType eventType) {
+ public ReplicaEvent(Replica replica, ClusterEventType eventType) {
this.replica = replica;
this.eventType = eventType;
}
@@ -52,11 +43,11 @@ public class ReplicaEvent {
this.replica = replica;
}
- public ReplicaEventType getEventType() {
+ public ClusterEventType getEventType() {
return eventType;
}
- public void setEventType(ReplicaEventType eventType) {
+ public void setEventType(ClusterEventType eventType) {
this.eventType = eventType;
}
@@ -68,7 +59,7 @@ public class ReplicaEvent {
public static ReplicaEvent create(DataInput input) throws IOException {
Replica replica = Replica.create(input);
- ReplicaEventType eventType = ReplicaEventType.values()[input.readInt()];
+ ClusterEventType eventType = ClusterEventType.values()[input.readInt()];
ReplicaEvent event = new ReplicaEvent(replica, eventType);
return event;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index a88c985..7e27c54 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -31,10 +31,10 @@ public interface ILogRecord {
OK
}
- public static final int JOB_TERMINATE_LOG_SIZE = 18; //JOB_COMMIT or ABORT log type
+ public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
public static final int UPDATE_LOG_BASE_SIZE = 59;
- public static final int FLUSH_LOG_SIZE = 22;
+ public static final int FLUSH_LOG_SIZE = 18;
public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
@@ -112,11 +112,9 @@ public interface ILogRecord {
public String getNodeId();
- public void setNodeId(String nodeId);
-
public int writeRemoteRecoveryLog(ByteBuffer buffer);
- public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+ public RECORD_STATUS readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
public void setReplicationThread(IReplicationThread replicationThread);
@@ -130,4 +128,16 @@ public interface ILogRecord {
public ByteBuffer getSerializedLog();
+ public void setNodeId(String nodeId);
+
+ public int getResourcePartition();
+
+ public void setResourcePartition(int resourcePartition);
+
+ public void setReplicated(boolean replicated);
+
+ /**
+ * @return a flag indicating whether the log record should be sent to remote replicas
+ */
+ public boolean isReplicated();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index cb3f8c3..a3115e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -18,8 +18,9 @@
*/
package org.apache.asterix.common.transactions;
+import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Set;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -82,31 +83,25 @@ public interface IRecoveryManager {
/**
* Makes a system checkpoint.
- * @param isSharpCheckpoint a flag indicating whether to perform a sharp or non-sharp checkpoint.
- * @param nonSharpCheckpointTargetLSN if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
+ *
+ * @param isSharpCheckpoint
+ * a flag indicating whether to perform a sharp or non-sharp checkpoint.
+ * @param nonSharpCheckpointTargetLSN
+ * if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
* @return the LSN at which the checkpoint was performed.
* @throws ACIDException
* @throws HyracksDataException
*/
- public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
+ public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
+ throws ACIDException, HyracksDataException;
/**
- * Performs recovery based on the passed logs
- * @param remoteLogs the remote logs to be replayed
- * @throws HyracksDataException
- * @throws ACIDException
- */
- public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException, ACIDException;
-
- /**
- *
* @return min first LSN of the open indexes (including remote indexes if replication is enabled)
* @throws HyracksDataException
*/
public long getMinFirstLSN() throws HyracksDataException;
/**
- *
* @return min first LSN of the open indexes
* @throws HyracksDataException
*/
@@ -114,11 +109,29 @@ public interface IRecoveryManager {
/**
* Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+ *
* @param partitions
* @param lowWaterMarkLSN
* @param failedNode
* @throws IOException
* @throws ACIDException
*/
- public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
+ public void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
+ throws IOException, ACIDException;
+
+ /**
+ * Creates a temporary file to be used during recovery
+ *
+ * @param jobId
+ * @param fileName
+ * @return A file to the created temporary file
+ * @throws IOException
+ * if the file for the specified {@code jobId} with the {@code fileName} already exists
+ */
+ public File createJobRecoveryFile(int jobId, String fileName) throws IOException;
+
+ /**
+ * Deletes all temporary recovery files
+ */
+ public void deleteRecoveryTemporaryFiles();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d4a96a5..d09f6ca 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -20,8 +20,6 @@ package org.apache.asterix.common.transactions;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
@@ -34,15 +32,14 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
/*
* == LogRecordFormat ==
* ---------------------------
- * [Header1] (10 bytes + NodeId Length) : for all log types
+ * [Header1] (6 bytes) : for all log types
* LogSource(1)
* LogType(1)
* JobId(4)
- * NodeIdLength(4)
- * NodeId(?)
* ---------------------------
- * [Header2] (12 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
+ * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
+ * ResourcePartition(4)
* PKHashValue(4)
* PKValueSize(4)
* PKValue(PKValueSize)
@@ -52,7 +49,7 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* LogRecordSize(4)
* ---------------------------
- * [Body] (Variable size) : only for update log type
+ * [Body] (9 bytes + NewValueSize) : only for update log type
* FieldCnt(4)
* NewOp(1)
* NewValueSize(4)
@@ -62,19 +59,18 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
* Checksum(8)
* ---------------------------
* = LogSize =
- * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
- * --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
- * 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
- * 4) FLUSH: 5 + 8 + DatasetId(4) (In case of serialize: + (8 bytes for LSN) + (4 bytes for number of flushed indexes)
+ * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
+ * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
+ * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
+ * --> UPDATE_LOG_BASE_SIZE = 59
+ * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
*/
public class LogRecord implements ILogRecord {
// ------------- fields in a log record (begin) ------------//
private byte logSource;
- private String nodeId;
- private int nodeIdLength;
private byte logType;
private int jobId;
private int datasetId;
@@ -83,6 +79,7 @@ public class LogRecord implements ILogRecord {
private ITupleReference PKValue;
private long prevLSN;
private long resourceId;
+ private int resourcePartition;
private int logSize;
private int fieldCnt;
private byte newOp;
@@ -103,9 +100,13 @@ public class LogRecord implements ILogRecord {
private PrimaryIndexOperationTracker opTracker;
private IReplicationThread replicationThread;
private ByteBuffer serializedLog;
- private final Map<String, byte[]> nodeIdsMap;
- // this field is used for serialized flush logs only to indicate how many indexes were flushed using its LSN.
+ /**
+ * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+ * to indicate the source of the log and how many indexes were flushed using its LSN.
+ */
private int numOfFlushedIndexes;
+ private String nodeId;
+ private boolean replicated = false;
public LogRecord() {
isFlushed = new AtomicBoolean(false);
@@ -113,16 +114,15 @@ public class LogRecord implements ILogRecord {
readPKValue = new PrimaryKeyTupleReference();
readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
- this.nodeIdsMap = new HashMap<String, byte[]>();
logSource = LogSource.LOCAL;
}
private final static int LOG_SOURCE_LEN = Byte.BYTES;
- private final static int NODE_ID_STRING_LENGTH = Integer.BYTES;
private final static int TYPE_LEN = Byte.BYTES;
public final static int PKHASH_LEN = Integer.BYTES;
public final static int PKSZ_LEN = Integer.BYTES;
private final static int PRVLSN_LEN = Long.BYTES;
+ private final static int RS_PARTITION_LEN = Integer.BYTES;
private final static int RSID_LEN = Long.BYTES;
private final static int LOGRCD_SZ_LEN = Integer.BYTES;
private final static int FLDCNT_LEN = Integer.BYTES;
@@ -130,27 +130,19 @@ public class LogRecord implements ILogRecord {
private final static int NEWVALSZ_LEN = Integer.BYTES;
private final static int CHKSUM_LEN = Long.BYTES;
- private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES + NODE_ID_STRING_LENGTH;
- private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+ private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+ private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+ + PKSZ_LEN;
private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+ private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
private void writeLogRecordCommonFields(ByteBuffer buffer) {
buffer.put(logSource);
buffer.put(logType);
buffer.putInt(jobId);
- if (nodeIdsMap.containsKey(nodeId)) {
- buffer.put(nodeIdsMap.get(nodeId));
- } else {
- // byte array for node id length and string
- byte[] bytes = new byte[(Integer.SIZE / 8) + nodeId.length()];
- buffer.putInt(nodeId.length());
- buffer.put(nodeId.getBytes(java.nio.charset.StandardCharsets.UTF_8));
- buffer.position(buffer.position() - bytes.length);
- buffer.get(bytes, 0, bytes.length);
- nodeIdsMap.put(nodeId, bytes);
- }
if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
+ buffer.putInt(resourcePartition);
buffer.putInt(datasetId);
buffer.putInt(PKHashValue);
if (PKValueSize <= 0) {
@@ -168,7 +160,6 @@ public class LogRecord implements ILogRecord {
buffer.putInt(newValueSize);
writeTuple(buffer, newValue, newValueSize);
}
-
if (logType == LogType.FLUSH) {
buffer.putInt(datasetId);
}
@@ -188,10 +179,9 @@ public class LogRecord implements ILogRecord {
int beginOffset = buffer.position();
writeLogRecordCommonFields(buffer);
- if (logSource == LogSource.LOCAL) {
- // copy the serialized log to send it to replicas
- int serializedLogSize = getSerializedLogSize(logType, logSize);
-
+ if (replicated) {
+ //copy the serialized log to send it to replicas
+ int serializedLogSize = getSerializedLogSize();
if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
serializedLog = ByteBuffer.allocate(serializedLogSize);
} else {
@@ -207,6 +197,8 @@ public class LogRecord implements ILogRecord {
if (logType == LogType.FLUSH) {
serializedLog.putLong(appendLSN);
serializedLog.putInt(numOfFlushedIndexes);
+ serializedLog.putInt(nodeId.length());
+ serializedLog.put(nodeId.getBytes());
}
serializedLog.flip();
buffer.position(currentPosition);
@@ -244,15 +236,8 @@ public class LogRecord implements ILogRecord {
public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
- // read header
- RECORD_STATUS status = readLogHeader(buffer);
- if (status != RECORD_STATUS.OK) {
- buffer.position(beginOffset);
- return status;
- }
-
- // read body
- status = readLogBody(buffer, false);
+ //read common fields
+ RECORD_STATUS status = readLogCommonFields(buffer);
if (status != RECORD_STATUS.OK) {
buffer.position(beginOffset);
return status;
@@ -271,38 +256,25 @@ public class LogRecord implements ILogRecord {
return RECORD_STATUS.OK;
}
- private RECORD_STATUS readLogHeader(ByteBuffer buffer) {
- // first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+ private RECORD_STATUS readLogCommonFields(ByteBuffer buffer) {
+ //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
return RECORD_STATUS.TRUNCATED;
}
logSource = buffer.get();
logType = buffer.get();
jobId = buffer.getInt();
- nodeIdLength = buffer.getInt();
- // attempt to read node id
- if (buffer.remaining() < nodeIdLength) {
- return RECORD_STATUS.TRUNCATED;
- }
- // read node id string
- nodeId = new String(buffer.array(), buffer.position() + buffer.arrayOffset(), nodeIdLength,
- java.nio.charset.StandardCharsets.UTF_8);
- // skip node id string bytes
- buffer.position(buffer.position() + nodeIdLength);
- return RECORD_STATUS.OK;
- }
-
- private RECORD_STATUS readLogBody(ByteBuffer buffer, boolean allocateTupleBuffer) {
if (logType != LogType.FLUSH) {
if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
PKHashValue = -1;
} else {
- // attempt to read in the dsid, PK hash and PK length
+ //attempt to read in the resourcePartition, dsid, PK hash and PK length
if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
return RECORD_STATUS.TRUNCATED;
}
+ resourcePartition = buffer.getInt();
datasetId = buffer.getInt();
PKHashValue = buffer.getInt();
PKValueSize = buffer.getInt();
@@ -330,16 +302,7 @@ public class LogRecord implements ILogRecord {
if (buffer.remaining() < newValueSize) {
return RECORD_STATUS.TRUNCATED;
}
- if (!allocateTupleBuffer) {
- newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
- } else {
- ByteBuffer tupleBuffer = ByteBuffer.allocate(newValueSize);
- tupleBuffer.put(buffer.array(), buffer.position(), newValueSize);
- tupleBuffer.flip();
- newValue = readTuple(tupleBuffer, readNewValue, fieldCnt, newValueSize);
- // skip tuple bytes
- buffer.position(buffer.position() + newValueSize);
- }
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
} else {
computeAndSetLogSize();
}
@@ -356,25 +319,47 @@ public class LogRecord implements ILogRecord {
}
@Override
- public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
- readLogHeader(buffer);
- if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
- readLogBody(buffer, false);
- } else {
- // need to allocate buffer for tuple since the logs will be kept in memory to use during remote recovery
- // TODO when this is redesigned to spill remote recovery logs to disk, this will not be needed
- readLogBody(buffer, true);
+ public RECORD_STATUS readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
+ int beginOffset = buffer.position();
+
+ //read common fields
+ RECORD_STATUS status = readLogCommonFields(buffer);
+ if (status != RECORD_STATUS.OK) {
+ buffer.position(beginOffset);
+ return status;
}
if (logType == LogType.FLUSH) {
- LSN = buffer.getLong();
- numOfFlushedIndexes = buffer.getInt();
+ if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
+ LSN = buffer.getLong();
+ numOfFlushedIndexes = buffer.getInt();
+ //read serialized node id
+ int nodeIdLength = buffer.getInt();
+ if (buffer.remaining() >= nodeIdLength) {
+ byte[] nodeIdBytes = new byte[nodeIdLength];
+ buffer.get(nodeIdBytes);
+ nodeId = new String(nodeIdBytes);
+ } else {
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ } else {
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
}
- // remote recovery logs need to have the LSN to check which should be replayed
- if (remoteRecoveryLog && nodeId.equals(localNodeId)) {
- LSN = buffer.getLong();
+ //remote recovery logs need to have the LSN to check which should be replayed
+ if (remoteRecoveryLog) {
+ if (buffer.remaining() >= Long.BYTES) {
+ LSN = buffer.getLong();
+ } else {
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
}
+
+ return RECORD_STATUS.OK;
}
private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -430,21 +415,19 @@ public class LogRecord implements ILogRecord {
default:
throw new IllegalStateException("Unsupported Log Type");
}
-
- logSize += nodeIdLength;
}
@Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
builder.append(" Source : ").append(LogSource.toString(logSource));
- builder.append(" NodeID : ").append(nodeId);
builder.append(" LSN : ").append(LSN);
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
+ builder.append(" ResourcePartition : ").append(resourcePartition);
builder.append(" PKHashValue : ").append(PKHashValue);
builder.append(" PKFieldCnt : ").append(PKFieldCnt);
builder.append(" PKSize: ").append(PKValueSize);
@@ -460,11 +443,8 @@ public class LogRecord implements ILogRecord {
public int writeRemoteRecoveryLog(ByteBuffer buffer) {
int bufferBegin = buffer.position();
writeLogRecordCommonFields(buffer);
- if (logType == LogType.FLUSH) {
- buffer.putLong(LSN);
- buffer.putInt(numOfFlushedIndexes);
- }
- // LSN must be included in all remote recovery logs (not only FLUSH)
+ //FLUSH logs should not included in remote recovery
+ //LSN must be included in all remote recovery logs
buffer.putLong(LSN);
return buffer.position() - bufferBegin;
}
@@ -560,21 +540,21 @@ public class LogRecord implements ILogRecord {
@Override
public int getSerializedLogSize() {
- return getSerializedLogSize(logType, logSize);
- }
-
- private static int getSerializedLogSize(Byte logType, int logSize) {
+ int serilizedSize = logSize;
if (logType == LogType.FLUSH) {
- // LSN
- logSize += (Long.SIZE / 8);
- // num of indexes
- logSize += (Integer.SIZE / 8);
+ //LSN
+ serilizedSize += Long.BYTES;
+ //num of indexes
+ serilizedSize += Integer.BYTES;
+ //serialized node id String
+ serilizedSize += Integer.BYTES + nodeId.length();
}
-
- // checksum not included in serialized version
- logSize -= CHKSUM_LEN;
-
- return logSize;
+ if (logSource == LogSource.REMOTE_RECOVERY) {
+ //for LSN;
+ serilizedSize += Long.BYTES;
+ }
+ serilizedSize -= CHKSUM_LEN;
+ return serilizedSize;
}
@Override
@@ -675,7 +655,6 @@ public class LogRecord implements ILogRecord {
@Override
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
- this.nodeIdLength = nodeId.length();
}
public IReplicationThread getReplicationThread() {
@@ -713,4 +692,23 @@ public class LogRecord implements ILogRecord {
this.opTracker = opTracker;
}
-}
+ @Override
+ public int getResourcePartition() {
+ return resourcePartition;
+ }
+
+ @Override
+ public void setResourcePartition(int resourcePartition) {
+ this.resourcePartition = resourcePartition;
+ }
+
+ @Override
+ public void setReplicated(boolean replicate) {
+ this.replicated = replicate;
+ }
+
+ @Override
+ public boolean isReplicated() {
+ return replicated;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
new file mode 100644
index 0000000..b75d16c
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+public class ServletUtil {
+
+ public enum Servlets {
+ AQL("/aql"),
+ AQL_QUERY("/query"),
+ AQL_UPDATE("/update"),
+ AQL_DDL("/ddl"),
+ SQLPP("/sqlpp"),
+ SQLPP_QUERY("/query/sqlpp"),
+ SQLPP_UPDATE("/update/sqlpp"),
+ SQLPP_DDL("/ddl/sqlpp"),
+ QUERY_STATUS("/query/status"),
+ QUERY_RESULT("/query/result"),
+ QUERY_SERVICE("/query/service"),
+ CONNECTOR("/connector"),
+ SHUTDOWN("/admin/shutdown"),
+ VERSION("/admin/version"),
+ CLUSTER_STATE("/admin/cluster");
+
+ private final String path;
+
+ private Servlets(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+ }
+
+ private ServletUtil() {
+ throw new AssertionError("No objects of this class should be created.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 48e42bd..5b4035c 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -66,4 +66,8 @@ public class StoragePathUtil {
private static String prepareFullIndexName(String datasetName, String idxName) {
return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
}
+
+ public static int getPartitonNumFromName(String name) {
+ return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 97674e6..1d5b15e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -28,61 +28,39 @@ public class TransactionUtil {
public static void formJobTerminateLogRecord(ITransactionContext txnCtx, LogRecord logRecord, boolean isCommit) {
logRecord.setTxnCtx(txnCtx);
- TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit,
- logRecord.getNodeId());
+ TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit);
}
- public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit, String nodeId) {
+ public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit) {
logRecord.setLogType(isCommit ? LogType.JOB_COMMIT : LogType.ABORT);
logRecord.setDatasetId(-1);
logRecord.setPKHashValue(-1);
logRecord.setJobId(jobId);
- logRecord.setNodeId(nodeId);
logRecord.computeAndSetLogSize();
}
public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker,
- int numOfFlushedIndexes) {
- formFlushLogRecord(logRecord, datasetId, opTracker, null, numOfFlushedIndexes);
- }
-
- public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker,
String nodeId, int numberOfIndexes) {
logRecord.setLogType(LogType.FLUSH);
logRecord.setJobId(-1);
logRecord.setDatasetId(datasetId);
logRecord.setOpTracker(opTracker);
logRecord.setNumOfFlushedIndexes(numberOfIndexes);
- if (nodeId != null) {
- logRecord.setNodeId(nodeId);
- }
+ logRecord.setNodeId(nodeId);
logRecord.computeAndSetLogSize();
}
public static void formEntityCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
- int PKHashValue, ITupleReference PKValue, int[] PKFields) {
- logRecord.setTxnCtx(txnCtx);
- logRecord.setLogType(LogType.ENTITY_COMMIT);
- logRecord.setJobId(txnCtx.getJobId().getId());
- logRecord.setDatasetId(datasetId);
- logRecord.setPKHashValue(PKHashValue);
- logRecord.setPKFieldCnt(PKFields.length);
- logRecord.setPKValue(PKValue);
- logRecord.setPKFields(PKFields);
- logRecord.computeAndSetPKValueSize();
- logRecord.computeAndSetLogSize();
- }
-
- public static void formEntityUpsertCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
- int PKHashValue, ITupleReference PKValue, int[] PKFields) {
+ int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) {
logRecord.setTxnCtx(txnCtx);
- logRecord.setLogType(LogType.UPSERT_ENTITY_COMMIT);
+ logRecord.setLogType(entityCommitType);
logRecord.setJobId(txnCtx.getJobId().getId());
logRecord.setDatasetId(datasetId);
logRecord.setPKHashValue(PKHashValue);
logRecord.setPKFieldCnt(PKFields.length);
logRecord.setPKValue(PKValue);
logRecord.setPKFields(PKFields);
+ logRecord.setResourcePartition(resourcePartition);
logRecord.computeAndSetPKValueSize();
logRecord.computeAndSetLogSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index f54db63..29e08b5 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
-import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
@@ -39,6 +38,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.utils.ServletUtil.Servlets;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
import org.apache.asterix.testframework.context.TestFileContext;
@@ -56,7 +56,6 @@ import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
public class TestExecutor {
@@ -262,6 +261,17 @@ public class TestExecutor {
return method.getResponseBodyAsStream();
}
+ public InputStream executeClusterStateQuery(OutputFormat fmt, String url) throws Exception {
+ HttpMethodBase method = new GetMethod(url);
+
+ //Set accepted output response type
+ method.setRequestHeader("Accept", fmt.mimeType());
+ // Provide custom retry handler is necessary
+ method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+ executeHttpMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
// To execute Update statements
// Insert and Delete statements are executed here
public void executeUpdate(String str, String url) throws Exception {
@@ -301,7 +311,7 @@ public class TestExecutor {
}
private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
- final String url = "http://" + host + ":" + port + "/query/result";
+ final String url = "http://" + host + ":" + port + Servlets.QUERY_RESULT.getPath();
// Create a method instance.
GetMethod method = new GetMethod(url);
@@ -372,6 +382,14 @@ public class TestExecutor {
return IOUtils.toString(input, StandardCharsets.UTF_8.name());
}
+ private static String executeVagrantManagix(ProcessBuilder pb, String command) throws Exception {
+ pb.command("vagrant", "ssh", "cc", "--", pb.environment().get("MANAGIX_HOME") + command);
+ Process p = pb.start();
+ p.waitFor();
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+
private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
String targetWord = "queries" + File.separator;
int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -439,9 +457,9 @@ public class TestExecutor {
switch (ctx.getType()) {
case "ddl":
if (ctx.getFile().getName().endsWith("aql")) {
- executeDDL(statement, "http://" + host + ":" + port + "/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
} else {
- executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
+ executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
}
break;
case "update":
@@ -451,9 +469,10 @@ public class TestExecutor {
"127.0.0.1://../../../../../../asterix-app/");
}
if (ctx.getFile().getName().endsWith("aql")) {
- executeUpdate(statement, "http://" + host + ":" + port + "/update");
+ executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
} else {
- executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
+ executeUpdate(statement,
+ "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
}
break;
case "query":
@@ -471,24 +490,26 @@ public class TestExecutor {
if (ctx.getFile().getName().endsWith("aql")) {
if (ctx.getType().equalsIgnoreCase("query")) {
resultStream = executeQuery(statement, fmt,
- "http://" + host + ":" + port + "/query", cUnit.getParameter());
+ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(),
+ cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://" + host + ":" + port + "/aql");
+ "http://" + host + ":" + port + Servlets.AQL.getPath());
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://" + host + ":" + port + "/aql");
+ "http://" + host + ":" + port + Servlets.AQL.getPath());
}
} else {
if (ctx.getType().equalsIgnoreCase("query")) {
resultStream = executeQuery(statement, fmt,
- "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
+ "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(),
+ cUnit.getParameter());
} else if (ctx.getType().equalsIgnoreCase("async")) {
resultStream = executeAnyAQLAsync(statement, false, fmt,
- "http://" + host + ":" + port + "/sqlpp");
+ "http://" + host + ":" + port + Servlets.SQLPP.getPath());
} else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
resultStream = executeAnyAQLAsync(statement, true, fmt,
- "http://" + host + ":" + port + "/sqlpp");
+ "http://" + host + ":" + port + Servlets.SQLPP.getPath());
}
}
@@ -505,9 +526,6 @@ public class TestExecutor {
runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
actualResultFile);
- LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
- + " PASSED ");
-
queryCount++;
break;
case "mgx":
@@ -515,7 +533,7 @@ public class TestExecutor {
break;
case "txnqbc": //qbc represents query before crash
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://" + host + ":" + port + "/query", cUnit.getParameter());
+ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
qbcFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qbc.adm");
@@ -524,20 +542,17 @@ public class TestExecutor {
break;
case "txnqar": //qar represents query after recovery
resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
- "http://" + host + ":" + port + "/query", cUnit.getParameter());
+ "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
qarFile = new File(actualPath + File.separator
+ testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
+ cUnit.getName() + "_qar.adm");
qarFile.getParentFile().mkdirs();
writeOutputToFile(qarFile, resultStream);
runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
-
- LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
- + " PASSED ");
break;
case "txneu": //eu represents erroneous update
try {
- executeUpdate(statement, "http://" + host + ":" + port + "/update");
+ executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
} catch (Exception e) {
//An exception is expected.
failed = true;
@@ -565,7 +580,7 @@ public class TestExecutor {
break;
case "errddl": // a ddlquery that expects error
try {
- executeDDL(statement, "http://" + host + ":" + port + "/ddl");
+ executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
} catch (Exception e) {
// expected error happens
failed = true;
@@ -576,7 +591,7 @@ public class TestExecutor {
}
System.err.println("...but that was expected.");
break;
- case "vagrant_script":
+ case "vscript": //a script that will be executed on a vagrant virtual node
try {
String[] command = statement.trim().split(" ");
if (command.length != 2) {
@@ -592,6 +607,32 @@ public class TestExecutor {
throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
}
break;
+ case "vmgx": //a managix command that will be executed on vagrant cc node
+ try {
+ String output = executeVagrantManagix(pb, statement);
+ if (output.contains("ERROR")) {
+ throw new Exception(output);
+ }
+ } catch (Exception e) {
+ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+ }
+ break;
+ case "cstate": //cluster state query
+ try {
+ fmt = OutputFormat.forCompilationUnit(cUnit);
+ resultStream = executeClusterStateQuery(fmt,
+ "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
+ expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
+ actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
+ actualResultFile.getParentFile().mkdirs();
+ writeOutputToFile(actualResultFile, resultStream);
+ runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
+ actualResultFile);
+ queryCount++;
+ } catch (Exception e) {
+ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+ }
+ break;
default:
throw new IllegalArgumentException("No statements of type " + ctx.getType());
}
@@ -626,6 +667,9 @@ public class TestExecutor {
"Test \"" + cUnit.getName() + "\" FAILED!\nExpected error was not thrown...");
e.printStackTrace();
throw e;
+ } else if (numOfFiles == testFileCtxs.size()) {
+ LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
+ + " PASSED ");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
index 86c15ae..62ca9bf 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -58,7 +58,9 @@ public class ReplicationIT {
private static ProcessBuilder pb;
private static Map<String, String> env;
private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
- private static String SCRIPT_HOME;
+ private static String SCRIPT_HOME = "/vagrant/scripts/";
+ private static String MANAGIX_HOME = "/tmp/asterix/bin/managix ";
+ private static final String INSTANCE_NAME = "asterix";
protected TestCaseContext tcCtx;
public ReplicationIT(TestCaseContext tcCtx) {
@@ -107,10 +109,10 @@ public class ReplicationIT {
remoteInvoke("cp -r /vagrant/" + managixFolderName + " /tmp/asterix");
- SCRIPT_HOME = "/vagrant/scripts/";
pb = new ProcessBuilder();
env = pb.environment();
env.put("SCRIPT_HOME", SCRIPT_HOME);
+ env.put("MANAGIX_HOME", MANAGIX_HOME);
File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
pb.directory(cwd);
pb.redirectErrorStream(true);
@@ -141,13 +143,13 @@ public class ReplicationIT {
@Before
public void beforeTest() throws Exception {
//create instance
- managixInvoke("create -n vagrant-ssh -c /vagrant/cluster_with_replication.xml").getInputStream();
+ managixInvoke("create -n " + INSTANCE_NAME + " -c /vagrant/cluster_with_replication.xml").getInputStream();
}
@After
public void afterTest() throws Exception {
//stop instance
- managixInvoke("stop -n vagrant-ssh");
+ managixInvoke("stop -n " + INSTANCE_NAME);
//verify that all processes have been stopped
String killProcesses = "kill_cc_and_nc.sh";
@@ -162,7 +164,7 @@ public class ReplicationIT {
executeVagrantScript("nc2", deleteStorage);
//delete instance
- managixInvoke("delete -n vagrant-ssh");
+ managixInvoke("delete -n " + INSTANCE_NAME);
}
@Test
@@ -244,7 +246,7 @@ public class ReplicationIT {
}
private static Process managixInvoke(String cmd) throws Exception {
- return remoteInvoke("/tmp/asterix/bin/managix " + cmd);
+ return remoteInvoke(MANAGIX_HOME + cmd);
}
private static String executeVagrantScript(String node, String scriptName) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
new file mode 100644
index 0000000..2c49a01
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+ organization-name: string,
+ start-date: date,
+ end-date: date?
+}
+
+create type FacebookUserType as closed {
+ id: int,
+ alias: string,
+ name: string,
+ user-since: datetime,
+ friend-ids: {{ int32 }},
+ employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+
+create dataset FacebookUsersInMemory(FacebookUserType)
+primary key id;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
new file mode 100644
index 0000000..bd01d99
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
new file mode 100644
index 0000000..b09c3d3
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
new file mode 100644
index 0000000..47f5975
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : node_failback.aql
+ * Description : Make sure node failback completes as expected.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+ kill one node and wait until the failover complete, query cluster state,
+ query data, insert new data, start the killed node and wait for failback,
+ query cluster state, query data.
+ * Expected Result : Success
+ * Date : February 3 2016
+ */
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+
+insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file