You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/27 00:31:25 UTC
[2/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
deleted file mode 100644
index 8e020eb..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-
-public class AsterixReplicationProtocol {
-
- /**
- * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
- */
- public static final String JOB_COMMIT_ACK = "$";
-
- public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
- public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
-
- /*
- * ReplicationRequestType:
- * REPLICATE_LOG: txn log replication
- * REPLICATE_FILE: replicate a file(s)
- * DELETE_FILE: delete a file(s)
- * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
- * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
- * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
- * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
- * UPDATE_REPLICA: used to update replica info such as IP Address change.
- * GOODBYE: used to notify replicas that the replication request has been completed
- * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
- * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
- * ACK: used to notify the requesting replica that the request has been completed successfully
- * FLUSH_INDEX: request remote replica to flush an LSM component
- */
- public enum ReplicationRequestType {
- REPLICATE_LOG,
- REPLICATE_FILE,
- DELETE_FILE,
- GET_REPLICA_FILES,
- GET_REPLICA_LOGS,
- GET_REPLICA_MAX_LSN,
- GET_REPLICA_MIN_LSN,
- UPDATE_REPLICA,
- GOODBYE,
- REPLICA_EVENT,
- LSM_COMPONENT_PROPERTIES,
- ACK,
- FLUSH_INDEX
- }
-
- public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
- //read request size
- NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
- int requestSize = dataBuffer.getInt();
-
- if (dataBuffer.capacity() < requestSize) {
- dataBuffer = ByteBuffer.allocate(requestSize);
- }
-
- //read request
- NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
-
- return dataBuffer;
- }
-
- public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
- throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- lsmCompProp.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
- }
- buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
- }
-
- public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
- throws IOException {
- //read replication request type
- NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
-
- ReplicationRequestType requestType = AsterixReplicationProtocol.ReplicationRequestType.values()[byteBuffer
- .getInt()];
- return requestType;
- }
-
- public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return LSMComponentProperties.create(dis);
- }
-
- public static ByteBuffer getGoodbyeBuffer() {
- ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
- bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
- bb.flip();
- return bb;
- }
-
- public static ByteBuffer getAckBuffer() {
- ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
- bb.putInt(ReplicationRequestType.ACK.ordinal());
- bb.flip();
- return bb;
- }
-
- public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
- requestBuffer.clear();
- //put request type (4 bytes)
- requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
- //leave space for log size
- requestBuffer.position(requestBuffer.position() + Integer.BYTES);
- int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
- //put request size (4 bytes)
- requestBuffer.putInt(4, logSize);
- requestBuffer.flip();
- }
-
- public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
- requestBuffer.clear();
- //put request type (4 bytes)
- requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
- //length of the log
- requestBuffer.putInt(serializedLog.length);
- //the log itself
- requestBuffer.put(serializedLog);
- requestBuffer.flip();
- }
-
- public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, AsterixLSMIndexFileProperties afp,
- ReplicationRequestType requestType) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- afp.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (requestBuffer.capacity() < requestSize) {
- requestBuffer = ByteBuffer.allocate(requestSize);
- } else {
- requestBuffer.clear();
- }
- requestBuffer.putInt(requestType.ordinal());
- requestBuffer.putInt(oos.size());
- requestBuffer.put(outputStream.toByteArray());
- requestBuffer.flip();
- return requestBuffer;
- }
-
- public static AsterixLSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return AsterixLSMIndexFileProperties.create(dis);
- }
-
- public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return ReplicaLogsRequest.create(dis);
- }
-
- public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
- throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (requestBuffer.capacity() < requestSize) {
- requestBuffer = ByteBuffer.allocate(requestSize);
- } else {
- requestBuffer.clear();
- }
- requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
- requestBuffer.putInt(oos.size());
- requestBuffer.put(outputStream.toByteArray());
- requestBuffer.flip();
- return requestBuffer;
- }
-
- public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
-
- oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
- replica.writeFields(oos);
- oos.close();
-
- ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
- buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- return buffer;
- }
-
- public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- event.serialize(oos);
- oos.close();
-
- ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
- buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
- }
-
- public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return Replica.create(dis);
- }
-
- public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
-
- return ReplicaEvent.create(dis);
- }
-
- public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
- }
- buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- }
-
- public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
- throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
- }
- buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
- }
-
- public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return ReplicaFilesRequest.create(dis);
- }
-
- public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return ReplicaIndexFlushRequest.create(dis);
- }
-
- public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
- requestBuffer.clear();
- requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
- requestBuffer.flip();
- }
-
- public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
- requestBuffer.clear();
- requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
- requestBuffer.flip();
- }
-
- public static int getJobIdFromLogAckMessage(String msg) {
- return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
- }
-
- public static String getNodeIdFromLogAckMessage(String msg) {
- return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
- }
-
- /**
- * Sends a goodbye request to a remote replica indicating the end of a replication request.
- *
- * @param socketChannel
- * the remote replica socket.
- * @throws IOException
- */
- public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
- ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
- NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
- }
-
- public static void sendAck(SocketChannel socketChannel) throws IOException {
- ByteBuffer ackBuffer = AsterixReplicationProtocol.getAckBuffer();
- NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
new file mode 100644
index 0000000..be8f8e3
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.functions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+
+public class ReplicationProtocol {
+
+ /**
+ * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
+ */
+ public static final String JOB_COMMIT_ACK = "$";
+
+ public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+ public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+
+ /*
+ * ReplicationRequestType:
+ * REPLICATE_LOG: txn log replication
+ * REPLICATE_FILE: replicate a file(s)
+ * DELETE_FILE: delete a file(s)
+ * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
+ * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
+ * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
+ * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
+ * UPDATE_REPLICA: used to update replica info such as IP Address change.
+ * GOODBYE: used to notify replicas that the replication request has been completed
+ * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
+ * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
+ * ACK: used to notify the requesting replica that the request has been completed successfully
+ * FLUSH_INDEX: request remote replica to flush an LSM component
+ */
+ public enum ReplicationRequestType {
+ REPLICATE_LOG,
+ REPLICATE_FILE,
+ DELETE_FILE,
+ GET_REPLICA_FILES,
+ GET_REPLICA_LOGS,
+ GET_REPLICA_MAX_LSN,
+ GET_REPLICA_MIN_LSN,
+ UPDATE_REPLICA,
+ GOODBYE,
+ REPLICA_EVENT,
+ LSM_COMPONENT_PROPERTIES,
+ ACK,
+ FLUSH_INDEX
+ }
+
+ public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
+ //read request size
+ NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
+ int requestSize = dataBuffer.getInt();
+
+ if (dataBuffer.capacity() < requestSize) {
+ dataBuffer = ByteBuffer.allocate(requestSize);
+ }
+
+ //read request
+ NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
+
+ return dataBuffer;
+ }
+
+ public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ lsmCompProp.serialize(oos);
+ oos.close();
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
+ throws IOException {
+ //read replication request type
+ NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
+
+ ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
+ .getInt()];
+ return requestType;
+ }
+
+ public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return LSMComponentProperties.create(dis);
+ }
+
+ public static ByteBuffer getGoodbyeBuffer() {
+ ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+ bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
+ bb.flip();
+ return bb;
+ }
+
+ public static ByteBuffer getAckBuffer() {
+ ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+ bb.putInt(ReplicationRequestType.ACK.ordinal());
+ bb.flip();
+ return bb;
+ }
+
+ public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
+ requestBuffer.clear();
+ //put request type (4 bytes)
+ requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+ //leave space for log size
+ requestBuffer.position(requestBuffer.position() + Integer.BYTES);
+ int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
+ //put request size (4 bytes)
+ requestBuffer.putInt(4, logSize);
+ requestBuffer.flip();
+ }
+
+ public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
+ requestBuffer.clear();
+ //put request type (4 bytes)
+ requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+ //length of the log
+ requestBuffer.putInt(serializedLog.length);
+ //the log itself
+ requestBuffer.put(serializedLog);
+ requestBuffer.flip();
+ }
+
+ public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
+ ReplicationRequestType requestType) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ afp.serialize(oos);
+ oos.close();
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (requestBuffer.capacity() < requestSize) {
+ requestBuffer = ByteBuffer.allocate(requestSize);
+ } else {
+ requestBuffer.clear();
+ }
+ requestBuffer.putInt(requestType.ordinal());
+ requestBuffer.putInt(oos.size());
+ requestBuffer.put(outputStream.toByteArray());
+ requestBuffer.flip();
+ return requestBuffer;
+ }
+
+ public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return LSMIndexFileProperties.create(dis);
+ }
+
+ public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return ReplicaLogsRequest.create(dis);
+ }
+
+ public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ request.serialize(oos);
+ oos.close();
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (requestBuffer.capacity() < requestSize) {
+ requestBuffer = ByteBuffer.allocate(requestSize);
+ } else {
+ requestBuffer.clear();
+ }
+ requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
+ requestBuffer.putInt(oos.size());
+ requestBuffer.put(outputStream.toByteArray());
+ requestBuffer.flip();
+ return requestBuffer;
+ }
+
+ public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+
+ oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+ replica.writeFields(oos);
+ oos.close();
+
+ ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+ buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ return buffer;
+ }
+
+ public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ event.serialize(oos);
+ oos.close();
+
+ ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+ buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
+ }
+
+ public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return Replica.create(dis);
+ }
+
+ public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+
+ return ReplicaEvent.create(dis);
+ }
+
+ public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ request.serialize(oos);
+ oos.close();
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ }
+
+ public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream oos = new DataOutputStream(outputStream);
+ request.serialize(oos);
+ oos.close();
+
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return ReplicaFilesRequest.create(dis);
+ }
+
+ public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ DataInputStream dis = new DataInputStream(bais);
+ return ReplicaIndexFlushRequest.create(dis);
+ }
+
+ public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
+ requestBuffer.clear();
+ requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
+ requestBuffer.flip();
+ }
+
+ public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
+ requestBuffer.clear();
+ requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
+ requestBuffer.flip();
+ }
+
+ public static int getJobIdFromLogAckMessage(String msg) {
+ return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
+ }
+
+ public static String getNodeIdFromLogAckMessage(String msg) {
+ return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
+ }
+
+ /**
+ * Sends a goodbye request to a remote replica indicating the end of a replication request.
+ *
+ * @param socketChannel
+ * the remote replica socket.
+ * @throws IOException
+ */
+ public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+ NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
+ }
+
+ public static void sendAck(SocketChannel socketChannel) throws IOException {
+ ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
+ NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 38be05e..a7cfaec 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -27,7 +27,7 @@ import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.management.ReplicationManager;
@@ -53,7 +53,7 @@ public class ReplicationLogBuffer {
}
public void append(ILogRecord logRecord) {
- appendBuffer.putInt(AsterixReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+ appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
appendBuffer.putInt(logRecord.getSerializedLogSize());
appendBuffer.put(logRecord.getSerializedLog());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
index 633d87a..9915c83 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
@@ -30,7 +30,7 @@ import java.util.logging.Logger;
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
public class ReplicaEventNotifier implements Runnable {
@@ -61,7 +61,7 @@ public class ReplicaEventNotifier implements Runnable {
ByteBuffer buffer = null;
try {
- buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+ buffer = ReplicationProtocol.writeReplicaEventRequest(event);
} catch (IOException e) {
e.printStackTrace();
}
@@ -79,7 +79,7 @@ public class ReplicaEventNotifier implements Runnable {
//send replica event
connection.write(buffer);
//send goodbye
- connection.write(AsterixReplicationProtocol.getGoodbyeBuffer());
+ connection.write(ReplicationProtocol.getGoodbyeBuffer());
break;
} catch (IOException | UnresolvedAddressException e) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index 07ed144..0c94c61 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Callable;
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
public class ReplicaStateChecker implements Callable<Void> {
@@ -61,7 +61,7 @@ public class ReplicaStateChecker implements Callable<Void> {
connection = SocketChannel.open();
connection.configureBlocking(true);
connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
- ByteBuffer buffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+ ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
connection.write(buffer);
replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
return null;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e6b2ebf..c97fe94 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -42,7 +42,9 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,15 +61,15 @@ import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
@@ -210,7 +212,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
public void run() {
Thread.currentThread().setName("Replication Thread");
try {
- ReplicationRequestType replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
inBuffer);
while (replicationFunction != ReplicationRequestType.GOODBYE) {
switch (replicationFunction) {
@@ -251,7 +253,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
throw new IllegalStateException("Unknown replication request");
}
}
- replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
}
} catch (Exception e) {
e.printStackTrace();
@@ -267,9 +269,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
}
private void handleFlushIndex() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
//1. read which indexes are requested to be flushed from remote replica
- ReplicaIndexFlushRequest request = AsterixReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
+ ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
//2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
@@ -302,26 +304,25 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
//the remaining indexes in the requested set are those which cannot be flushed.
//4. respond back to the requester that those indexes cannot be flushed
ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
- outBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
+ outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
private void handleLSMComponentProperties() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- LSMComponentProperties lsmCompProp = AsterixReplicationProtocol.readLSMPropertiesRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
//create mask to indicate that this component is not valid yet
replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
}
private void handleReplicateFile() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- AsterixLSMIndexFileProperties afp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
- String replicaFolderPath = replicaResourcesManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(),
- afp.getDataverse(), afp.getIdxName());
-
- String replicaFilePath = replicaFolderPath + File.separator + afp.getFileName();
+ //get index path
+ String indexPath = replicaResourcesManager.getIndexPath(afp);
+ String replicaFilePath = indexPath + File.separator + afp.getFileName();
//create file
File destFile = new File(replicaFilePath);
@@ -334,20 +335,20 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
fileChannel.force(true);
if (afp.requiresAck()) {
- AsterixReplicationProtocol.sendAck(socketChannel);
+ ReplicationProtocol.sendAck(socketChannel);
}
if (afp.isLSMComponentFile()) {
- String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
+ String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
- LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
+ LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
destFile.getAbsolutePath(), afp.getLSNByteOffset());
lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
} else {
- updateLSMComponentRemainingFiles(compoentId);
+ updateLSMComponentRemainingFiles(componentId);
}
} else {
//index metadata file
- replicaResourcesManager.initializeReplicaIndexLSNMap(replicaFolderPath, logManager.getAppendLSN());
+ replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
}
}
}
@@ -370,43 +371,48 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
}
private void handleGetReplicaFiles() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaFilesRequest request = AsterixReplicationProtocol.readReplicaFileRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
- AsterixLSMIndexFileProperties fileProperties = new AsterixLSMIndexFileProperties();
+ LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
List<String> filesList;
Set<String> replicaIds = request.getReplicaIds();
-
+ Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getMetadataProperties().getNodePartitions();
for (String replicaId : replicaIds) {
- filesList = replicaResourcesManager.getResourcesForReplica(replicaId);
-
- //start sending files
- for (String filePath : filesList) {
- try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
- FileChannel fileChannel = fromFile.getChannel();) {
- long fileSize = fileChannel.size();
- fileProperties.initialize(filePath, fileSize, replicaId, false,
- IMetaDataPageManager.INVALID_LSN_OFFSET, false);
- outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
- ReplicationRequestType.REPLICATE_FILE);
-
- //send file info
- NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-
- //transfer file
- NetworkingUtil.sendFile(fileChannel, socketChannel);
+ //get replica partitions
+ ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+ for (ClusterPartition partition : replicaPatitions) {
+ filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
+
+ //start sending files
+ for (String filePath : filesList) {
+ try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+ FileChannel fileChannel = fromFile.getChannel();) {
+ long fileSize = fileChannel.size();
+ fileProperties.initialize(filePath, fileSize, replicaId, false,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+ outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer,
+ fileProperties, ReplicationRequestType.REPLICATE_FILE);
+
+ //send file info
+ NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+
+ //transfer file
+ NetworkingUtil.sendFile(fileChannel, socketChannel);
+ }
}
}
}
//send goodbye (end of files)
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
private void handleGetRemoteLogs() throws IOException, ACIDException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaLogsRequest request = AsterixReplicationProtocol.readReplicaLogsRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
Set<String> replicaIds = request.getReplicaIds();
long fromLSN = request.getFromLSN();
@@ -433,13 +439,13 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
int requestSize = logRecord.getSerializedLogSize()
- + AsterixReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+ + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
outBuffer = ByteBuffer.allocate(requestSize);
}
//set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
- AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+ ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
logRecord = logReader.next();
@@ -449,32 +455,32 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
}
//send goodbye (end of logs)
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
private void handleUpdateReplica() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- Replica replica = AsterixReplicationProtocol.readReplicaUpdateRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
replicationManager.updateReplicaInfo(replica);
}
private void handleReplicaEvent() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaEvent event = AsterixReplicationProtocol.readReplicaEventRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
replicationManager.reportReplicaEvent(event);
}
private void handleDeleteFile() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- AsterixLSMIndexFileProperties fileProp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
- replicaResourcesManager.deleteRemoteFile(fileProp);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
+ replicaResourcesManager.deleteIndexFile(fileProp);
if (fileProp.requiresAck()) {
- AsterixReplicationProtocol.sendAck(socketChannel);
+ ReplicationProtocol.sendAck(socketChannel);
}
}
private void handleLogReplication() throws IOException, ACIDException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
//Deserialize log
remoteLog.readRemoteLog(inBuffer, false, localNodeID);
@@ -518,7 +524,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
//send ACK to requester
try {
socketChannel.socket().getOutputStream().write(
- (localNodeID + AsterixReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
+ (localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
.getBytes());
socketChannel.socket().getOutputStream().flush();
} catch (IOException e) {
@@ -625,6 +631,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
}
File destFile = new File(syncTask.getComponentFilePath());
+ //prepare local LSN buffer
ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
metadataBuffer.flip();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 36e5dff..5c35df4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -66,17 +66,16 @@ import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
import org.apache.asterix.replication.logging.ReplicationLogFlusher;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -129,7 +128,8 @@ public class ReplicationManager implements IReplicationManager {
private ReplicationLogFlusher txnlogsReplicator;
private Future<? extends Object> txnLogReplicatorTask;
private Map<String, SocketChannel> logsReplicaSockets = null;
-
+ //TODO this class needs to be refactored by moving its private classes to separate files
+ //and possibly using MessageBroker to send/receive remote replicas events.
public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
@@ -255,7 +255,7 @@ public class ReplicationManager implements IReplicationManager {
throws IOException {
boolean isLSMComponentFile;
ByteBuffer responseBuffer = null;
- AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties();
+ LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
if (requestBuffer == null) {
requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
}
@@ -277,7 +277,7 @@ public class ReplicationManager implements IReplicationManager {
//send LSMComponent properties
LSMComponentJob = (ILSMIndexReplicationJob) job;
LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
- requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
+ requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
requestBuffer);
sendRequest(replicasSockets, requestBuffer);
}
@@ -310,7 +310,7 @@ public class ReplicationManager implements IReplicationManager {
IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
}
- requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
+ requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -350,7 +350,7 @@ public class ReplicationManager implements IReplicationManager {
remainingFiles--;
asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
- AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+ ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
ReplicationRequestType.DELETE_FILE);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -392,13 +392,13 @@ public class ReplicationManager implements IReplicationManager {
private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
throws IOException {
if (responseBuffer == null) {
- responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
+ responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
} else {
responseBuffer.clear();
}
//read response from remote replicas
- ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
responseBuffer);
return responseFunction;
}
@@ -519,7 +519,7 @@ public class ReplicationManager implements IReplicationManager {
node.setClusterIp(newAddress);
Replica replica = new Replica(node);
- ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica);
+ ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
sendRequest(replicaSockets, buffer);
closeReplicaSockets(replicaSockets);
@@ -537,7 +537,7 @@ public class ReplicationManager implements IReplicationManager {
node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
Replica replica = new Replica(node);
ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
- ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+ ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
sendRequest(replicaSockets, buffer);
closeReplicaSockets(replicaSockets);
@@ -581,7 +581,7 @@ public class ReplicationManager implements IReplicationManager {
*/
private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
//send goodbye
- ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
sendRequest(replicaSockets, goodbyeBuffer);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
@@ -910,7 +910,7 @@ public class ReplicationManager implements IReplicationManager {
ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
for (String replicaId : replicaIds) {
//1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
- HashMap<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
+ Map<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
nonSharpCheckpointTargetLSN);
if (laggingIndexes.size() > 0) {
@@ -919,7 +919,7 @@ public class ReplicationManager implements IReplicationManager {
try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest(
laggingIndexes.keySet());
- requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
+ requestBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
laggingIndexesRequest);
NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
@@ -927,19 +927,19 @@ public class ReplicationManager implements IReplicationManager {
ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
- requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer);
+ requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
//returning the indexes that were not flushed
- laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
+ laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
//4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
if (laggingIndexesResponse != null) {
for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
String indexPath = laggingIndexes.get(resouceId);
- HashMap<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
+ Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
}
@@ -953,7 +953,7 @@ public class ReplicationManager implements IReplicationManager {
public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
long maxRemoteLSN = 0;
- AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
+ ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
try {
for (String replicaId : remoteReplicas) {
@@ -988,26 +988,26 @@ public class ReplicationManager implements IReplicationManager {
@Override
public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
- AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+ ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
- String destFolder;
+ String indexPath;
String destFilePath;
-
- ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
dataBuffer);
- AsterixLSMIndexFileProperties fileProperties;
+ LSMIndexFileProperties fileProperties;
while (responseFunction != ReplicationRequestType.GOODBYE) {
- dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+ dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
+
+ fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
- fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer);
- destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(),
- fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName());
- destFilePath = destFolder + File.separator + fileProperties.getFileName();
+ //get index path
+ indexPath = replicaResourcesManager.getIndexPath(fileProperties);
+ destFilePath = indexPath + File.separator + fileProperties.getFileName();
//create file
File destFile = new File(destFilePath);
@@ -1024,14 +1024,14 @@ public class ReplicationManager implements IReplicationManager {
//we need to create LSN map for .metadata files that belong to remote replicas
if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
//replica index
- replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN());
+ replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
}
- responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
}
@@ -1039,7 +1039,7 @@ public class ReplicationManager implements IReplicationManager {
@Override
public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
long minLSN = 0;
- AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer);
+ ReplicationProtocol.writeMinLSNRequest(dataBuffer);
try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1049,7 +1049,7 @@ public class ReplicationManager implements IReplicationManager {
minLSN = dataBuffer.getLong();
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
return minLSN;
@@ -1060,19 +1060,19 @@ public class ReplicationManager implements IReplicationManager {
public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
throws IOException, ACIDException {
ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
- dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
+ dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
//read response type
- ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
ILogRecord logRecord = new LogRecord();
while (responseType != ReplicationRequestType.GOODBYE) {
- dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+ dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
logRecord.readRemoteLog(dataBuffer, true, nodeId);
if (logRecord.getNodeId().equals(nodeId)) {
@@ -1085,11 +1085,11 @@ public class ReplicationManager implements IReplicationManager {
logManager.log(logRecord);
}
- responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
return recoveryLogs;
}
}
@@ -1136,11 +1136,7 @@ public class ReplicationManager implements IReplicationManager {
updateReplicaState(replicaId, ReplicaState.DEAD, true);
//delete any invalid LSMComponents for this replica
- try {
- replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
+ replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
}
public void handleShutdownEvent(String replicaId) {
@@ -1237,8 +1233,8 @@ public class ReplicationManager implements IReplicationManager {
break;
}
//read ACK for job commit log
- String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
- int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
+ String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+ int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
addAckToJob(jobId, replicaId);
}
} catch (AsynchronousCloseException e1) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index a82b535..ee987f8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.replication.recovery;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,10 +31,12 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -55,13 +58,20 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@Override
public void performRemoteRecovery() {
+ //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
+ //Currently we will not allow a node to perform remote recovery since another replica
+ //already tookover its workload and might not resync correctly if there are on on-going
+ //jobs on the replica.
+ if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
+ throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
+ }
//The whole remote recovery process should be atomic.
//Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
int maxRecoveryAttempts = 10;
PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
while (true) {
- //start recovery recovery steps
+ //start recovery steps
try {
maxRecoveryAttempts--;
@@ -76,7 +86,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
int activeReplicasCount = replicationManager.getActiveReplicasCount();
if (activeReplicasCount == 0) {
- throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery");
+ throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
}
//2. clean any memory data that could've existed from previous failed recovery attempt
@@ -90,8 +100,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
//5. get max LSN from selected remote replicas
- long maxRemoteLSN = 0;
- maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
+ long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
//6. force LogManager to start from a partition > maxLSN in selected remote replicas
logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
@@ -107,8 +116,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
//2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
if (replicasDataToRecover.contains(logManager.getNodeId())) {
((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
- .initializeNewUniverse(
- runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+ .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
//initialize resource id factor to correct max resource id
runtimeContext.initializeResourceIdFactory();
}
@@ -140,7 +148,6 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
}
private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-
//1. identify which replicas reside in this node
String localNodeId = logManager.getNodeId();
Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
@@ -205,4 +212,14 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
return recoveryList;
}
+
+ @Override
+ public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
+ long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
+ //reply logs > minLSN that belong to these partitions
+ //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
+ //this needs to be updated once log formats are updated to include the partition id
+ runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
+ failedNode);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
deleted file mode 100644
index 67b39c4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-
-public class AsterixFilesUtil {
-
- public static void deleteFolder(String folderPath) throws IOException {
- File folder = new File(folderPath);
- if (folder.exists()) {
- //delete files inside the folder
- while (deleteDirecotryFiles(folderPath) != true) {
- //if there is a file being written (e.g. LSM Component), wait and try again to delete the file
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- //ignore
- }
- }
-
- //delete the folder itself
- folder.delete();
- }
- }
-
- private static boolean deleteDirecotryFiles(String dirPath) throws IOException {
- try {
- Path directory = Paths.get(dirPath);
- Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
- return FileVisitResult.CONTINUE;
- }
-
- });
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-}