You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2019/09/20 22:52:48 UTC
[cassandra] branch trunk updated: Untangle RepairMessage
sub-hierarchy of messages, use new messaging (more) correctly
This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9ff884 Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly
f9ff884 is described below
commit f9ff88437742675db5c53f5834884b43f8937e00
Author: Aleksey Yeschenko <al...@apache.org>
AuthorDate: Fri Jun 14 14:49:46 2019 +0100
Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly
patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-15163
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/net/Verb.java | 152 +++++++++++++--------
.../cassandra/repair/AsymmetricRemoteSyncTask.java | 4 +-
.../cassandra/repair/RepairMessageVerbHandler.java | 36 ++---
.../org/apache/cassandra/repair/SnapshotTask.java | 4 +-
.../cassandra/repair/StreamingRepairTask.java | 10 +-
.../cassandra/repair/SymmetricRemoteSyncTask.java | 6 +-
.../apache/cassandra/repair/ValidationTask.java | 4 +-
.../org/apache/cassandra/repair/Validator.java | 16 +--
.../repair/consistent/CoordinatorSession.java | 16 +--
.../cassandra/repair/consistent/LocalSessions.java | 46 ++++---
.../repair/messages/AsymmetricSyncRequest.java | 14 +-
.../cassandra/repair/messages/CleanupMessage.java | 13 +-
.../cassandra/repair/messages/FailSession.java | 5 +-
.../cassandra/repair/messages/FinalizeCommit.java | 5 +-
.../cassandra/repair/messages/FinalizePromise.java | 5 +-
.../cassandra/repair/messages/FinalizePropose.java | 5 +-
.../repair/messages/PrepareConsistentRequest.java | 6 +-
.../repair/messages/PrepareConsistentResponse.java | 5 +-
.../cassandra/repair/messages/PrepareMessage.java | 24 +++-
.../cassandra/repair/messages/RepairMessage.java | 85 +-----------
.../cassandra/repair/messages/SnapshotMessage.java | 13 +-
.../cassandra/repair/messages/StatusRequest.java | 5 +-
.../cassandra/repair/messages/StatusResponse.java | 5 +-
.../cassandra/repair/messages/SyncRequest.java | 14 +-
.../{SyncComplete.java => SyncResponse.java} | 34 +++--
.../repair/messages/ValidationRequest.java | 9 +-
...dationComplete.java => ValidationResponse.java} | 42 +++---
.../cassandra/service/ActiveRepairService.java | 22 ++-
.../serialization/4.0/service.SyncComplete.bin | Bin 258 -> 256 bytes
.../data/serialization/4.0/service.SyncRequest.bin | Bin 111 -> 110 bytes
.../4.0/service.ValidationComplete.bin | Bin 600 -> 597 bytes
.../4.0/service.ValidationRequest.bin | Bin 75 -> 74 bytes
.../org/apache/cassandra/repair/RepairJobTest.java | 22 +--
.../repair/SymmetricRemoteSyncTaskTest.java | 2 +-
.../org/apache/cassandra/repair/ValidatorTest.java | 37 +++--
.../consistent/CoordinatorMessagingTest.java | 44 +++---
.../repair/consistent/CoordinatorSessionTest.java | 5 +-
.../repair/consistent/LocalSessionTest.java | 6 +-
.../messages/RepairMessageSerializationsTest.java | 18 +--
.../messages/RepairMessageSerializerTest.java | 41 +++---
.../cassandra/service/SerializationsTest.java | 81 +++++------
42 files changed, 400 insertions(+), 462 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2929a98..1a3df81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha2
+ * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly (CASSANDRA-15163)
* Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260)
* Add Alibaba Cloud Platform snitch (CASSANDRA-15092)
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 6463a5a..67d847e 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -56,7 +56,22 @@ import org.apache.cassandra.hints.HintMessage;
import org.apache.cassandra.hints.HintVerbHandler;
import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.CleanupMessage;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.PrepareMessage;
+import org.apache.cassandra.repair.messages.SnapshotMessage;
+import org.apache.cassandra.repair.messages.StatusRequest;
+import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.repair.messages.SyncResponse;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.repair.messages.ValidationResponse;
+import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.schema.SchemaPullVerbHandler;
import org.apache.cassandra.schema.SchemaPushVerbHandler;
import org.apache.cassandra.schema.SchemaVersionVerbHandler;
@@ -84,75 +99,92 @@ import static org.apache.cassandra.schema.MigrationManager.MigrationsSerializer;
*/
public enum Verb
{
- MUTATION_RSP (60, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- MUTATION_REQ (0, P3, writeTimeout, MUTATION, () -> Mutation.serializer, () -> MutationVerbHandler.instance, MUTATION_RSP ),
- HINT_RSP (61, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- HINT_REQ (1, P4, writeTimeout, MUTATION, () -> HintMessage.serializer, () -> HintVerbHandler.instance, HINT_RSP ),
- READ_REPAIR_RSP (62, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- READ_REPAIR_REQ (2, P1, writeTimeout, MUTATION, () -> Mutation.serializer, () -> ReadRepairVerbHandler.instance, READ_REPAIR_RSP ),
- BATCH_STORE_RSP (65, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- BATCH_STORE_REQ (5, P3, writeTimeout, MUTATION, () -> Batch.serializer, () -> BatchStoreVerbHandler.instance, BATCH_STORE_RSP ),
- BATCH_REMOVE_RSP (66, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- BATCH_REMOVE_REQ (6, P3, writeTimeout, MUTATION, () -> UUIDSerializer.serializer, () -> BatchRemoveVerbHandler.instance, BATCH_REMOVE_RSP ),
-
- PAXOS_PREPARE_RSP (93, P2, writeTimeout, REQUEST_RESPONSE, () -> PrepareResponse.serializer, () -> ResponseVerbHandler.instance ),
- PAXOS_PREPARE_REQ (33, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> PrepareVerbHandler.instance, PAXOS_PREPARE_RSP ),
- PAXOS_PROPOSE_RSP (94, P2, writeTimeout, REQUEST_RESPONSE, () -> BooleanSerializer.serializer, () -> ResponseVerbHandler.instance ),
- PAXOS_PROPOSE_REQ (34, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> ProposeVerbHandler.instance, PAXOS_PROPOSE_RSP ),
- PAXOS_COMMIT_RSP (95, P2, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- PAXOS_COMMIT_REQ (35, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> CommitVerbHandler.instance, PAXOS_COMMIT_RSP ),
-
- TRUNCATE_RSP (79, P0, truncateTimeout, REQUEST_RESPONSE, () -> TruncateResponse.serializer, () -> ResponseVerbHandler.instance ),
- TRUNCATE_REQ (19, P0, truncateTimeout, MUTATION, () -> TruncateRequest.serializer, () -> TruncateVerbHandler.instance, TRUNCATE_RSP ),
-
- COUNTER_MUTATION_RSP (84, P1, counterTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- COUNTER_MUTATION_REQ (24, P2, counterTimeout, COUNTER_MUTATION, () -> CounterMutation.serializer, () -> CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP),
-
- READ_RSP (63, P2, readTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ),
- READ_REQ (3, P3, readTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, READ_RSP ),
- RANGE_RSP (69, P2, rangeTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ),
- RANGE_REQ (9, P3, rangeTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, RANGE_RSP ),
-
- GOSSIP_DIGEST_SYN (14, P0, longTimeout, GOSSIP, () -> GossipDigestSyn.serializer, () -> GossipDigestSynVerbHandler.instance ),
- GOSSIP_DIGEST_ACK (15, P0, longTimeout, GOSSIP, () -> GossipDigestAck.serializer, () -> GossipDigestAckVerbHandler.instance ),
- GOSSIP_DIGEST_ACK2 (16, P0, longTimeout, GOSSIP, () -> GossipDigestAck2.serializer, () -> GossipDigestAck2VerbHandler.instance ),
- GOSSIP_SHUTDOWN (29, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> GossipShutdownVerbHandler.instance ),
-
- ECHO_RSP (91, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- ECHO_REQ (31, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> EchoVerbHandler.instance, ECHO_RSP ),
- PING_RSP (97, P1, pingTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- PING_REQ (37, P1, pingTimeout, GOSSIP, () -> PingRequest.serializer, () -> PingVerbHandler.instance, PING_RSP ),
+ MUTATION_RSP (60, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ MUTATION_REQ (0, P3, writeTimeout, MUTATION, () -> Mutation.serializer, () -> MutationVerbHandler.instance, MUTATION_RSP ),
+ HINT_RSP (61, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ HINT_REQ (1, P4, writeTimeout, MUTATION, () -> HintMessage.serializer, () -> HintVerbHandler.instance, HINT_RSP ),
+ READ_REPAIR_RSP (62, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ READ_REPAIR_REQ (2, P1, writeTimeout, MUTATION, () -> Mutation.serializer, () -> ReadRepairVerbHandler.instance, READ_REPAIR_RSP ),
+ BATCH_STORE_RSP (65, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ BATCH_STORE_REQ (5, P3, writeTimeout, MUTATION, () -> Batch.serializer, () -> BatchStoreVerbHandler.instance, BATCH_STORE_RSP ),
+ BATCH_REMOVE_RSP (66, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ BATCH_REMOVE_REQ (6, P3, writeTimeout, MUTATION, () -> UUIDSerializer.serializer, () -> BatchRemoveVerbHandler.instance, BATCH_REMOVE_RSP ),
+
+ PAXOS_PREPARE_RSP (93, P2, writeTimeout, REQUEST_RESPONSE, () -> PrepareResponse.serializer, () -> ResponseVerbHandler.instance ),
+ PAXOS_PREPARE_REQ (33, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> PrepareVerbHandler.instance, PAXOS_PREPARE_RSP ),
+ PAXOS_PROPOSE_RSP (94, P2, writeTimeout, REQUEST_RESPONSE, () -> BooleanSerializer.serializer, () -> ResponseVerbHandler.instance ),
+ PAXOS_PROPOSE_REQ (34, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> ProposeVerbHandler.instance, PAXOS_PROPOSE_RSP ),
+ PAXOS_COMMIT_RSP (95, P2, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ PAXOS_COMMIT_REQ (35, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> CommitVerbHandler.instance, PAXOS_COMMIT_RSP ),
+
+ TRUNCATE_RSP (79, P0, truncateTimeout, REQUEST_RESPONSE, () -> TruncateResponse.serializer, () -> ResponseVerbHandler.instance ),
+ TRUNCATE_REQ (19, P0, truncateTimeout, MUTATION, () -> TruncateRequest.serializer, () -> TruncateVerbHandler.instance, TRUNCATE_RSP ),
+
+ COUNTER_MUTATION_RSP (84, P1, counterTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ COUNTER_MUTATION_REQ (24, P2, counterTimeout, COUNTER_MUTATION, () -> CounterMutation.serializer, () -> CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP),
+
+ READ_RSP (63, P2, readTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ),
+ READ_REQ (3, P3, readTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, READ_RSP ),
+ RANGE_RSP (69, P2, rangeTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ),
+ RANGE_REQ (9, P3, rangeTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, RANGE_RSP ),
+
+ GOSSIP_DIGEST_SYN (14, P0, longTimeout, GOSSIP, () -> GossipDigestSyn.serializer, () -> GossipDigestSynVerbHandler.instance ),
+ GOSSIP_DIGEST_ACK (15, P0, longTimeout, GOSSIP, () -> GossipDigestAck.serializer, () -> GossipDigestAckVerbHandler.instance ),
+ GOSSIP_DIGEST_ACK2 (16, P0, longTimeout, GOSSIP, () -> GossipDigestAck2.serializer, () -> GossipDigestAck2VerbHandler.instance ),
+ GOSSIP_SHUTDOWN (29, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> GossipShutdownVerbHandler.instance ),
+
+ ECHO_RSP (91, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ ECHO_REQ (31, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> EchoVerbHandler.instance, ECHO_RSP ),
+ PING_RSP (97, P1, pingTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ PING_REQ (37, P1, pingTimeout, GOSSIP, () -> PingRequest.serializer, () -> PingVerbHandler.instance, PING_RSP ),
// P1 because messages can be arbitrarily large or aren't crucial
- SCHEMA_PUSH_RSP (98, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- SCHEMA_PUSH_REQ (18, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> SchemaPushVerbHandler.instance, SCHEMA_PUSH_RSP ),
- SCHEMA_PULL_RSP (88, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> ResponseVerbHandler.instance ),
- SCHEMA_PULL_REQ (28, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaPullVerbHandler.instance, SCHEMA_PULL_RSP ),
- SCHEMA_VERSION_RSP (80, P1, rpcTimeout, MIGRATION, () -> UUIDSerializer.serializer, () -> ResponseVerbHandler.instance ),
- SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaVersionVerbHandler.instance, SCHEMA_VERSION_RSP ),
- REPAIR_RSP (92, P1, rpcTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- REPAIR_REQ (32, P1, rpcTimeout, ANTI_ENTROPY, () -> RepairMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
-
- REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
- SNAPSHOT_RSP (87, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
- SNAPSHOT_REQ (27, P0, rpcTimeout, MISC, () -> SnapshotCommand.serializer, () -> SnapshotVerbHandler.instance, SNAPSHOT_RSP ),
+ SCHEMA_PUSH_RSP (98, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ SCHEMA_PUSH_REQ (18, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> SchemaPushVerbHandler.instance, SCHEMA_PUSH_RSP ),
+ SCHEMA_PULL_RSP (88, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> ResponseVerbHandler.instance ),
+ SCHEMA_PULL_REQ (28, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaPullVerbHandler.instance, SCHEMA_PULL_RSP ),
+ SCHEMA_VERSION_RSP (80, P1, rpcTimeout, MIGRATION, () -> UUIDSerializer.serializer, () -> ResponseVerbHandler.instance ),
+ SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaVersionVerbHandler.instance, SCHEMA_VERSION_RSP ),
+
+ // repair; mostly doesn't use callbacks and sends responses as their own request messages, with matching sessions by uuid; should eventually harmonize and make idiomatic
+ REPAIR_RSP (100, P1, rpcTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ VALIDATION_RSP (102, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ VALIDATION_REQ (101, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ SYNC_RSP (104, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ SYNC_REQ (103, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ PREPARE_MSG (105, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ SNAPSHOT_MSG (106, P1, rpcTimeout, ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ CLEANUP_MSG (107, P1, rpcTimeout, ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ FINALIZE_PROPOSE_MSG (110, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ FINALIZE_PROMISE_MSG (111, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ FINALIZE_COMMIT_MSG (112, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ FAILED_SESSION_MSG (113, P1, rpcTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ STATUS_RSP (115, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ STATUS_REQ (114, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+ ASYMMETRIC_SYNC_REQ (116, P1, rpcTimeout, ANTI_ENTROPY, () -> AsymmetricSyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ),
+
+ REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
+ SNAPSHOT_RSP (87, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ),
+ SNAPSHOT_REQ (27, P0, rpcTimeout, MISC, () -> SnapshotCommand.serializer, () -> SnapshotVerbHandler.instance, SNAPSHOT_RSP ),
// generic failure response
- FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailureReason.serializer, () -> ResponseVerbHandler.instance ),
+ FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailureReason.serializer, () -> ResponseVerbHandler.instance ),
// dummy verbs
- _TRACE (30, P1, rpcTimeout, TRACING, () -> NoPayload.serializer, () -> null ),
- _SAMPLE (42, P1, rpcTimeout, INTERNAL_RESPONSE, () -> NoPayload.serializer, () -> null ),
- _TEST_1 (10, P0, writeTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ),
- _TEST_2 (11, P1, rpcTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ),
+ _TRACE (30, P1, rpcTimeout, TRACING, () -> NoPayload.serializer, () -> null ),
+ _SAMPLE (42, P1, rpcTimeout, INTERNAL_RESPONSE, () -> NoPayload.serializer, () -> null ),
+ _TEST_1 (10, P0, writeTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ),
+ _TEST_2 (11, P1, rpcTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ),
@Deprecated
- REQUEST_RSP (4, P1, rpcTimeout, REQUEST_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ),
+ REQUEST_RSP (4, P1, rpcTimeout, REQUEST_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ),
@Deprecated
- INTERNAL_RSP (23, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ),
+ INTERNAL_RSP (23, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ),
- // largest used ID: 99
+ // largest used ID: 116
;
public static final List<Verb> VERBS = ImmutableList.copyOf(Verb.values());
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 68a5824..cf6d84b 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.ASYMMETRIC_SYNC_REQ;
/**
* AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
@@ -53,7 +53,7 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem
AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
Tracing.traceRepair(message);
- MessagingService.instance().send(Message.out(REPAIR_REQ, request), request.fetchingNode);
+ MessagingService.instance().send(Message.out(ASYMMETRIC_SYNC_REQ, request), request.fetchingNode);
}
public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2a87fa2..27ffd05 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_RSP;
/**
* Handles all repair related message.
@@ -62,9 +62,9 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
RepairJobDesc desc = message.payload.desc;
try
{
- switch (message.payload.messageType)
+ switch (message.verb())
{
- case PREPARE_MESSAGE:
+ case PREPARE_MSG:
PrepareMessage prepareMessage = (PrepareMessage) message.payload;
logger.debug("Preparing, {}", prepareMessage);
List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.tableIds.size());
@@ -90,7 +90,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
MessagingService.instance().send(message.emptyResponse(), message.from());
break;
- case SNAPSHOT:
+ case SNAPSHOT_MSG:
logger.debug("Snapshotting {}", desc);
final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
if (cfs == null)
@@ -114,7 +114,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
MessagingService.instance().send(message.emptyResponse(), message.from());
break;
- case VALIDATION_REQUEST:
+ case VALIDATION_REQ:
ValidationRequest validationRequest = (ValidationRequest) message.payload;
logger.debug("Validating {}", validationRequest);
// trigger read-only compaction
@@ -122,7 +122,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
if (store == null)
{
logger.error("Table {}.{} was dropped during snapshot phase of repair", desc.keyspace, desc.columnFamily);
- MessagingService.instance().send(Message.out(REPAIR_REQ, new ValidationComplete(desc)), message.from());
+ MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from());
return;
}
@@ -132,7 +132,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
ValidationManager.instance.submitValidation(store, validator);
break;
- case SYNC_REQUEST:
+ case SYNC_REQ:
// forwarded sync request
SyncRequest request = (SyncRequest) message.payload;
logger.debug("Syncing {}", request);
@@ -147,7 +147,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
task.run();
break;
- case ASYMMETRIC_SYNC_REQUEST:
+ case ASYMMETRIC_SYNC_REQ:
// forwarded sync request
AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
logger.debug("Syncing {}", asymmetricSyncRequest);
@@ -162,49 +162,49 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
asymmetricTask.run();
break;
- case CLEANUP:
+ case CLEANUP_MSG:
logger.debug("cleaning up repair");
CleanupMessage cleanup = (CleanupMessage) message.payload;
ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession);
MessagingService.instance().send(message.emptyResponse(), message.from());
break;
- case CONSISTENT_REQUEST:
+ case PREPARE_CONSISTENT_REQ:
ActiveRepairService.instance.consistent.local.handlePrepareMessage(message.from(), (PrepareConsistentRequest) message.payload);
break;
- case CONSISTENT_RESPONSE:
+ case PREPARE_CONSISTENT_RSP:
ActiveRepairService.instance.consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse) message.payload);
break;
- case FINALIZE_PROPOSE:
+ case FINALIZE_PROPOSE_MSG:
ActiveRepairService.instance.consistent.local.handleFinalizeProposeMessage(message.from(), (FinalizePropose) message.payload);
break;
- case FINALIZE_PROMISE:
+ case FINALIZE_PROMISE_MSG:
ActiveRepairService.instance.consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise) message.payload);
break;
- case FINALIZE_COMMIT:
+ case FINALIZE_COMMIT_MSG:
ActiveRepairService.instance.consistent.local.handleFinalizeCommitMessage(message.from(), (FinalizeCommit) message.payload);
break;
- case FAILED_SESSION:
+ case FAILED_SESSION_MSG:
FailSession failure = (FailSession) message.payload;
ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(), failure);
break;
- case STATUS_REQUEST:
+ case STATUS_REQ:
ActiveRepairService.instance.consistent.local.handleStatusRequest(message.from(), (StatusRequest) message.payload);
break;
- case STATUS_RESPONSE:
+ case STATUS_RSP:
ActiveRepairService.instance.consistent.local.handleStatusResponse(message.from(), (StatusResponse) message.payload);
break;
default:
- ActiveRepairService.instance.handleMessage(message.from(), message.payload);
+ ActiveRepairService.instance.handleMessage(message);
break;
}
}
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index fab4b28..40e4b3d 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SnapshotMessage;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SNAPSHOT_MSG;
/**
* SnapshotTask is a task that sends snapshot request.
@@ -46,7 +46,7 @@ public class SnapshotTask extends AbstractFuture<InetAddressAndPort> implements
public void run()
{
- MessagingService.instance().sendWithCallback(Message.out(REPAIR_REQ, new SnapshotMessage(desc)),
+ MessagingService.instance().sendWithCallback(Message.out(SNAPSHOT_MSG, new SnapshotMessage(desc)),
endpoint,
new SnapshotCallback(this));
}
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 53407c9..827dce3 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncResponse;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
@@ -39,11 +39,11 @@ import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.StreamOperation;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_RSP;
/**
* StreamingRepairTask performs data streaming between two remote replicas, neither of which is repair coordinator.
- * Task will send {@link SyncComplete} message back to coordinator upon streaming completion.
+ * Task will send {@link SyncResponse} message back to coordinator upon streaming completion.
*/
public class StreamingRepairTask implements Runnable, StreamEventHandler
{
@@ -103,7 +103,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
public void onSuccess(StreamState state)
{
logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, initiator);
- MessagingService.instance().send(Message.out(REPAIR_REQ, new SyncComplete(desc, src, dst, true, state.createSummaries())), initiator);
+ MessagingService.instance().send(Message.out(SYNC_RSP, new SyncResponse(desc, src, dst, true, state.createSummaries())), initiator);
}
/**
@@ -111,6 +111,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
*/
public void onFailure(Throwable t)
{
- MessagingService.instance().send(Message.out(REPAIR_REQ, new SyncComplete(desc, src, dst, false, Collections.emptyList())), initiator);
+ MessagingService.instance().send(Message.out(SYNC_RSP, new SyncResponse(desc, src, dst, false, Collections.emptyList())), initiator);
}
}
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index b608d67..181554a 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
/**
* SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
@@ -53,9 +53,9 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo
super(desc, r1, r2, differences, previewKind);
}
- void sendRequest(RepairMessage request, InetAddressAndPort to)
+ void sendRequest(SyncRequest request, InetAddressAndPort to)
{
- MessagingService.instance().send(Message.out(REPAIR_REQ, request), to);
+ MessagingService.instance().send(Message.out(SYNC_REQ, request), to);
}
@Override
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 1892a59..0161acf 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.MerkleTrees;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
/**
* ValidationTask sends {@link ValidationRequest} to a replica.
@@ -54,7 +54,7 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn
public void run()
{
ValidationRequest request = new ValidationRequest(desc, nowInSec);
- MessagingService.instance().send(Message.out(REPAIR_REQ, request), endpoint);
+ MessagingService.instance().send(Message.out(VALIDATION_REQ, request), endpoint);
}
/**
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 9a89fa6..0d6fdb4 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.tracing.Tracing;
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTree.RowHash;
import org.apache.cassandra.utils.MerkleTrees;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_RSP;
/**
* Handles the building of a merkle tree for a column family.
@@ -391,7 +391,7 @@ public class Validator implements Runnable
public void fail()
{
logger.error("Failed creating a merkle tree for {}, {} (see log for details)", desc, initiator);
- respond(new ValidationComplete(desc));
+ respond(new ValidationResponse(desc));
}
/**
@@ -410,7 +410,7 @@ public class Validator implements Runnable
Tracing.traceRepair("Local completed merkle tree for {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
}
- respond(new ValidationComplete(desc, trees));
+ respond(new ValidationResponse(desc, trees));
}
private boolean initiatorIsRemote()
@@ -418,11 +418,11 @@ public class Validator implements Runnable
return !FBUtilities.getBroadcastAddressAndPort().equals(initiator);
}
- private void respond(ValidationComplete response)
+ private void respond(ValidationResponse response)
{
if (initiatorIsRemote())
{
- MessagingService.instance().send(Message.out(REPAIR_REQ, response), initiator);
+ MessagingService.instance().send(Message.out(VALIDATION_RSP, response), initiator);
return;
}
@@ -434,7 +434,7 @@ public class Validator implements Runnable
*/
StageManager.getStage(Stage.ANTI_ENTROPY).execute(() ->
{
- ValidationComplete movedResponse = response;
+ ValidationResponse movedResponse = response;
try
{
movedResponse = response.tryMoveOffHeap();
@@ -443,7 +443,7 @@ public class Validator implements Runnable
{
logger.error("Failed to move local merkle tree for {} off heap", desc, e);
}
- ActiveRepairService.instance.handleMessage(initiator, movedResponse);
+ ActiveRepairService.instance.handleMessage(Message.out(VALIDATION_RSP, movedResponse));
});
}
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 28f5d08..8f1759a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -139,11 +139,10 @@ public class CoordinatorSession extends ConsistentSession
return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED);
}
- protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, Message<RepairMessage> message)
{
- logger.trace("Sending {} to {}", message, destination);
- Message<RepairMessage> messageOut = Message.out(Verb.REPAIR_REQ, message);
- MessagingService.instance().send(messageOut, destination);
+ logger.trace("Sending {} to {}", message.payload, destination);
+ MessagingService.instance().send(message, destination);
}
public ListenableFuture<Boolean> prepare()
@@ -151,7 +150,8 @@ public class CoordinatorSession extends ConsistentSession
Preconditions.checkArgument(allStates(State.PREPARING));
logger.debug("Beginning prepare phase of incremental repair session {}", sessionID);
- PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants);
+ Message<RepairMessage> message =
+ Message.out(Verb.PREPARE_CONSISTENT_REQ, new PrepareConsistentRequest(sessionID, coordinator, participants));
for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
@@ -198,7 +198,7 @@ public class CoordinatorSession extends ConsistentSession
{
Preconditions.checkArgument(allStates(State.REPAIRING));
logger.debug("Proposing finalization of repair session {}", sessionID);
- FinalizePropose message = new FinalizePropose(sessionID);
+ Message<RepairMessage> message = Message.out(Verb.FINALIZE_PROPOSE_MSG, new FinalizePropose(sessionID));
for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
@@ -234,7 +234,7 @@ public class CoordinatorSession extends ConsistentSession
{
Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
logger.debug("Committing finalization of repair session {}", sessionID);
- FinalizeCommit message = new FinalizeCommit(sessionID);
+ Message<RepairMessage> message = Message.out(Verb.FINALIZE_COMMIT_MSG, new FinalizeCommit(sessionID));
for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
@@ -245,7 +245,7 @@ public class CoordinatorSession extends ConsistentSession
private void sendFailureMessageToParticipants()
{
- FailSession message = new FailSession(sessionID);
+ Message<RepairMessage> message = Message.out(Verb.FAILED_SESSION_MSG, new FailSession(sessionID));
for (final InetAddressAndPort participant : participants)
{
if (participantStates.get(participant) != State.FAILED)
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index e93ccb0..935bba8 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -88,7 +88,11 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.FAILED_SESSION_MSG;
+import static org.apache.cassandra.net.Verb.FINALIZE_PROMISE_MSG;
+import static org.apache.cassandra.net.Verb.PREPARE_CONSISTENT_RSP;
+import static org.apache.cassandra.net.Verb.STATUS_REQ;
+import static org.apache.cassandra.net.Verb.STATUS_RSP;
import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
/**
@@ -190,10 +194,11 @@ public class LocalSessions
sessionID, session.coordinator);
setStateAndSave(session, FAILED);
+ Message<FailSession> message = Message.out(FAILED_SESSION_MSG, new FailSession(sessionID));
for (InetAddressAndPort participant : session.participants)
{
if (!participant.equals(getBroadcastAddressAndPort()))
- sendMessage(participant, new FailSession(sessionID));
+ sendMessage(participant, message);
}
}
@@ -491,11 +496,10 @@ public class LocalSessions
return ActiveRepairService.instance.getParentRepairSession(sessionID);
}
- protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message)
{
- logger.trace("sending {} to {}", message, destination);
- Message<RepairMessage> messageOut = Message.out(REPAIR_REQ, message);
- MessagingService.instance().send(messageOut, destination);
+ logger.trace("sending {} to {}", message.payload, destination);
+ MessagingService.instance().send(message, destination);
}
private void setStateAndSave(LocalSession session, ConsistentSession.State state)
@@ -538,7 +542,7 @@ public class LocalSessions
}
if (sendMessage)
{
- sendMessage(session.coordinator, new FailSession(sessionID));
+ sendMessage(session.coordinator, Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
}
}
}
@@ -609,7 +613,7 @@ public class LocalSessions
catch (Throwable e)
{
logger.error("Error retrieving ParentRepairSession for session {}, responding with failure", sessionID);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+ sendMessage(coordinator, Message.out(PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
return;
}
@@ -632,15 +636,14 @@ public class LocalSessions
{
logger.info("Prepare phase for incremental repair session {} completed", sessionID);
if (session.getState() != FAILED)
- {
setStateAndSave(session, PREPARED);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
- }
else
- {
logger.info("Session {} failed before anticompaction completed", sessionID);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
- }
+
+ Message<PrepareConsistentResponse> message =
+ Message.out(PREPARE_CONSISTENT_RSP,
+ new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), session.getState() != FAILED));
+ sendMessage(coordinator, message);
}
finally
{
@@ -653,7 +656,9 @@ public class LocalSessions
try
{
logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+ sendMessage(coordinator,
+ Message.out(PREPARE_CONSISTENT_RSP,
+ new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
failSession(sessionID, false);
}
finally
@@ -682,7 +687,7 @@ public class LocalSessions
if (session == null)
{
logger.debug("Received FinalizePropose message for unknown repair session {}, responding with failure", sessionID);
- sendMessage(from, new FailSession(sessionID));
+ sendMessage(from, Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
return;
}
@@ -699,7 +704,7 @@ public class LocalSessions
*/
syncTable();
- sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true));
+ sendMessage(from, Message.out(FINALIZE_PROMISE_MSG, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)));
logger.debug("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID);
}
catch (IllegalArgumentException e)
@@ -753,7 +758,8 @@ public class LocalSessions
public void sendStatusRequest(LocalSession session)
{
logger.debug("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID);
- StatusRequest request = new StatusRequest(session.sessionID);
+ Message<StatusRequest> request = Message.out(STATUS_REQ, new StatusRequest(session.sessionID));
+
for (InetAddressAndPort participant : session.participants)
{
if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant))
@@ -771,11 +777,11 @@ public class LocalSessions
if (session == null)
{
logger.warn("Received status response message for unknown session {}", sessionID);
- sendMessage(from, new StatusResponse(sessionID, FAILED));
+ sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, FAILED)));
}
else
{
- sendMessage(from, new StatusResponse(sessionID, session.getState()));
+ sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, session.getState())));
logger.debug("Responding to status response message for incremental repair session {} with local state {}", sessionID, session.getState());
}
}
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
index 0a6d257..eacc285 100644
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -38,8 +39,6 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
public class AsymmetricSyncRequest extends RepairMessage
{
- public static MessageSerializer serializer = new SyncRequestSerializer();
-
public final InetAddressAndPort initiator;
public final InetAddressAndPort fetchingNode;
public final InetAddressAndPort fetchFrom;
@@ -48,7 +47,7 @@ public class AsymmetricSyncRequest extends RepairMessage
public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
{
- super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
+ super(desc);
this.initiator = initiator;
this.fetchingNode = fetchingNode;
this.fetchFrom = fetchFrom;
@@ -62,8 +61,7 @@ public class AsymmetricSyncRequest extends RepairMessage
if (!(o instanceof AsymmetricSyncRequest))
return false;
AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
- return messageType == req.messageType &&
- desc.equals(req.desc) &&
+ return desc.equals(req.desc) &&
initiator.equals(req.initiator) &&
fetchingNode.equals(req.fetchingNode) &&
fetchFrom.equals(req.fetchFrom) &&
@@ -73,10 +71,10 @@ public class AsymmetricSyncRequest extends RepairMessage
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc, initiator, fetchingNode, fetchFrom, ranges);
+ return Objects.hash(desc, initiator, fetchingNode, fetchFrom, ranges);
}
- public static class SyncRequestSerializer implements MessageSerializer<AsymmetricSyncRequest>
+ public static final IVersionedSerializer<AsymmetricSyncRequest> serializer = new IVersionedSerializer<AsymmetricSyncRequest>()
{
public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
{
@@ -119,7 +117,7 @@ public class AsymmetricSyncRequest extends RepairMessage
size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
return size;
}
- }
+ };
public String toString()
{
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
index 69d147a..5ec7fc6 100644
--- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -32,12 +33,11 @@ import org.apache.cassandra.utils.UUIDSerializer;
*/
public class CleanupMessage extends RepairMessage
{
- public static MessageSerializer serializer = new CleanupMessageSerializer();
public final UUID parentRepairSession;
public CleanupMessage(UUID parentRepairSession)
{
- super(Type.CLEANUP, null);
+ super(null);
this.parentRepairSession = parentRepairSession;
}
@@ -47,17 +47,16 @@ public class CleanupMessage extends RepairMessage
if (!(o instanceof CleanupMessage))
return false;
CleanupMessage other = (CleanupMessage) o;
- return messageType == other.messageType &&
- parentRepairSession.equals(other.parentRepairSession);
+ return parentRepairSession.equals(other.parentRepairSession);
}
@Override
public int hashCode()
{
- return Objects.hash(messageType, parentRepairSession);
+ return Objects.hash(parentRepairSession);
}
- public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
+ public static final IVersionedSerializer<CleanupMessage> serializer = new IVersionedSerializer<CleanupMessage>()
{
public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException
{
@@ -74,5 +73,5 @@ public class CleanupMessage extends RepairMessage
{
return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
}
- }
+ };
}
diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java b/src/java/org/apache/cassandra/repair/messages/FailSession.java
index 1227cc3..b8c7ad3 100644
--- a/src/java/org/apache/cassandra/repair/messages/FailSession.java
+++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.util.UUID;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FailSession extends RepairMessage
public FailSession(UUID sessionID)
{
- super(Type.FAILED_SESSION, null);
+ super(null);
assert sessionID != null;
this.sessionID = sessionID;
}
@@ -51,7 +52,7 @@ public class FailSession extends RepairMessage
return sessionID.hashCode();
}
- public static final MessageSerializer serializer = new MessageSerializer<FailSession>()
+ public static final IVersionedSerializer<FailSession> serializer = new IVersionedSerializer<FailSession>()
{
public void serialize(FailSession msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
index a4eb111..bb5cca7 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.util.UUID;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FinalizeCommit extends RepairMessage
public FinalizeCommit(UUID sessionID)
{
- super(Type.FINALIZE_COMMIT, null);
+ super(null);
assert sessionID != null;
this.sessionID = sessionID;
}
@@ -58,7 +59,7 @@ public class FinalizeCommit extends RepairMessage
'}';
}
- public static MessageSerializer serializer = new MessageSerializer<FinalizeCommit>()
+ public static final IVersionedSerializer<FinalizeCommit> serializer = new IVersionedSerializer<FinalizeCommit>()
{
public void serialize(FinalizeCommit msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index 07e7e0d..cfdc07c 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,7 +38,7 @@ public class FinalizePromise extends RepairMessage
public FinalizePromise(UUID sessionID, InetAddressAndPort participant, boolean promised)
{
- super(Type.FINALIZE_PROMISE, null);
+ super(null);
assert sessionID != null;
assert participant != null;
this.sessionID = sessionID;
@@ -65,7 +66,7 @@ public class FinalizePromise extends RepairMessage
return result;
}
- public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>()
+ public static final IVersionedSerializer<FinalizePromise> serializer = new IVersionedSerializer<FinalizePromise>()
{
public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
index c0c49df..c21dd78 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.util.UUID;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FinalizePropose extends RepairMessage
public FinalizePropose(UUID sessionID)
{
- super(Type.FINALIZE_PROPOSE, null);
+ super(null);
assert sessionID != null;
this.sessionID = sessionID;
}
@@ -58,7 +59,7 @@ public class FinalizePropose extends RepairMessage
'}';
}
- public static MessageSerializer serializer = new MessageSerializer<FinalizePropose>()
+ public static final IVersionedSerializer<FinalizePropose> serializer = new IVersionedSerializer<FinalizePropose>()
{
public void serialize(FinalizePropose msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index b1e9b04..c1be082 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -41,7 +42,7 @@ public class PrepareConsistentRequest extends RepairMessage
public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> participants)
{
- super(Type.CONSISTENT_REQUEST, null);
+ super(null);
assert parentSession != null;
assert coordinator != null;
assert participants != null && !participants.isEmpty();
@@ -79,9 +80,8 @@ public class PrepareConsistentRequest extends RepairMessage
'}';
}
- public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>()
+ public static final IVersionedSerializer<PrepareConsistentRequest> serializer = new IVersionedSerializer<PrepareConsistentRequest>()
{
-
public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(request.parentSession, out, version);
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index 3362a40..00de77d 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,7 +38,7 @@ public class PrepareConsistentResponse extends RepairMessage
public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort participant, boolean success)
{
- super(Type.CONSISTENT_RESPONSE, null);
+ super(null);
assert parentSession != null;
assert participant != null;
this.parentSession = parentSession;
@@ -65,7 +66,7 @@ public class PrepareConsistentResponse extends RepairMessage
return result;
}
- public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>()
+ public static final IVersionedSerializer<PrepareConsistentResponse> serializer = new IVersionedSerializer<PrepareConsistentResponse>()
{
public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 5a0701c..9c485bc 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -24,10 +24,13 @@ import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import com.google.common.base.Preconditions;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
@@ -38,7 +41,6 @@ import org.apache.cassandra.utils.UUIDSerializer;
public class PrepareMessage extends RepairMessage
{
- public final static MessageSerializer serializer = new PrepareMessageSerializer();
public final List<TableId> tableIds;
public final Collection<Range<Token>> ranges;
@@ -50,7 +52,7 @@ public class PrepareMessage extends RepairMessage
public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal, PreviewKind previewKind)
{
- super(Type.PREPARE_MESSAGE, null);
+ super(null);
this.parentRepairSession = parentRepairSession;
this.tableIds = tableIds;
this.ranges = ranges;
@@ -66,8 +68,7 @@ public class PrepareMessage extends RepairMessage
if (!(o instanceof PrepareMessage))
return false;
PrepareMessage other = (PrepareMessage) o;
- return messageType == other.messageType &&
- parentRepairSession.equals(other.parentRepairSession) &&
+ return parentRepairSession.equals(other.parentRepairSession) &&
isIncremental == other.isIncremental &&
isGlobal == other.isGlobal &&
previewKind == other.previewKind &&
@@ -79,13 +80,18 @@ public class PrepareMessage extends RepairMessage
@Override
public int hashCode()
{
- return Objects.hash(messageType, parentRepairSession, isGlobal, previewKind, isIncremental, timestamp, tableIds, ranges);
+ return Objects.hash(parentRepairSession, isGlobal, previewKind, isIncremental, timestamp, tableIds, ranges);
}
- public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
+ private static final String MIXED_MODE_ERROR = "Some nodes involved in repair are on an incompatible major version. " +
+ "Repair is not supported in mixed major version clusters.";
+
+ public static final IVersionedSerializer<PrepareMessage> serializer = new IVersionedSerializer<PrepareMessage>()
{
public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException
{
+ Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
out.writeInt(message.tableIds.size());
for (TableId tableId : message.tableIds)
tableId.serialize(out);
@@ -104,6 +110,8 @@ public class PrepareMessage extends RepairMessage
public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException
{
+ Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
int tableIdCount = in.readInt();
List<TableId> tableIds = new ArrayList<>(tableIdCount);
for (int i = 0; i < tableIdCount; i++)
@@ -122,6 +130,8 @@ public class PrepareMessage extends RepairMessage
public long serializedSize(PrepareMessage message, int version)
{
+ Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
long size;
size = TypeSizes.sizeof(message.tableIds.size());
for (TableId tableId : message.tableIds)
@@ -136,7 +146,7 @@ public class PrepareMessage extends RepairMessage
size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
return size;
}
- }
+ };
@Override
public String toString()
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index db1a134..3137b4e 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -17,14 +17,6 @@
*/
package org.apache.cassandra.repair.messages;
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairJobDesc;
/**
@@ -34,85 +26,10 @@ import org.apache.cassandra.repair.RepairJobDesc;
*/
public abstract class RepairMessage
{
- public static final IVersionedSerializer<RepairMessage> serializer = new RepairMessageSerializer();
-
- public static interface MessageSerializer<T extends RepairMessage> extends IVersionedSerializer<T> {}
-
- public static final int MIN_MESSAGING_VERSION = MessagingService.VERSION_40;
- private static final String MIXED_MODE_ERROR = "Some nodes involved in repair are on an incompatible major version. " +
- "Repair is not supported in mixed major version clusters.";
-
- public enum Type
- {
- VALIDATION_REQUEST(0, ValidationRequest.serializer),
- VALIDATION_COMPLETE(1, ValidationComplete.serializer),
- SYNC_REQUEST(2, SyncRequest.serializer),
- SYNC_COMPLETE(3, SyncComplete.serializer),
- PREPARE_MESSAGE(5, PrepareMessage.serializer),
- SNAPSHOT(6, SnapshotMessage.serializer),
- CLEANUP(7, CleanupMessage.serializer),
-
- CONSISTENT_REQUEST(8, PrepareConsistentRequest.serializer),
- CONSISTENT_RESPONSE(9, PrepareConsistentResponse.serializer),
- FINALIZE_PROPOSE(10, FinalizePropose.serializer),
- FINALIZE_PROMISE(11, FinalizePromise.serializer),
- FINALIZE_COMMIT(12, FinalizeCommit.serializer),
- FAILED_SESSION(13, FailSession.serializer),
- STATUS_REQUEST(14, StatusRequest.serializer),
- STATUS_RESPONSE(15, StatusResponse.serializer),
- ASYMMETRIC_SYNC_REQUEST(16, AsymmetricSyncRequest.serializer);
-
- private final byte type;
- private final MessageSerializer<RepairMessage> serializer;
-
- Type(int type, MessageSerializer<RepairMessage> serializer)
- {
- this.type = (byte) type;
- this.serializer = serializer;
- }
-
- public static Type fromByte(byte b)
- {
- for (Type t : values())
- {
- if (t.type == b)
- return t;
- }
- throw new IllegalArgumentException("Unknown RepairMessage.Type: " + b);
- }
- }
-
- public final Type messageType;
public final RepairJobDesc desc;
- protected RepairMessage(Type messageType, RepairJobDesc desc)
+ protected RepairMessage(RepairJobDesc desc)
{
- this.messageType = messageType;
this.desc = desc;
}
-
- public static class RepairMessageSerializer implements MessageSerializer<RepairMessage>
- {
- public void serialize(RepairMessage message, DataOutputPlus out, int version) throws IOException
- {
- Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
- out.write(message.messageType.type);
- message.messageType.serializer.serialize(message, out, version);
- }
-
- public RepairMessage deserialize(DataInputPlus in, int version) throws IOException
- {
- Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
- RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte());
- return messageType.serializer.deserialize(in, version);
- }
-
- public long serializedSize(RepairMessage message, int version)
- {
- Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
- long size = 1; // for messageType byte
- size += message.messageType.serializer.serializedSize(message, version);
- return size;
- }
- }
}
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
index d4737d3..c18950a 100644
--- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -20,17 +20,16 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.util.Objects;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.RepairJobDesc;
public class SnapshotMessage extends RepairMessage
{
- public final static MessageSerializer serializer = new SnapshotMessageSerializer();
-
public SnapshotMessage(RepairJobDesc desc)
{
- super(Type.SNAPSHOT, desc);
+ super(desc);
}
@Override
@@ -39,16 +38,16 @@ public class SnapshotMessage extends RepairMessage
if (!(o instanceof SnapshotMessage))
return false;
SnapshotMessage other = (SnapshotMessage) o;
- return messageType == other.messageType;
+ return desc.equals(other.desc);
}
@Override
public int hashCode()
{
- return Objects.hash(messageType);
+ return Objects.hash(desc);
}
- public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage>
+ public static final IVersionedSerializer<SnapshotMessage> serializer = new IVersionedSerializer<SnapshotMessage>()
{
public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException
{
@@ -65,5 +64,5 @@ public class SnapshotMessage extends RepairMessage
{
return RepairJobDesc.serializer.serializedSize(message.desc, version);
}
- }
+ };
}
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
index f6a2b82..09354e6 100644
--- a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.util.UUID;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class StatusRequest extends RepairMessage
public StatusRequest(UUID sessionID)
{
- super(Type.STATUS_REQUEST, null);
+ super(null);
this.sessionID = sessionID;
}
@@ -57,7 +58,7 @@ public class StatusRequest extends RepairMessage
'}';
}
- public static MessageSerializer serializer = new MessageSerializer<StatusRequest>()
+ public static final IVersionedSerializer<StatusRequest> serializer = new IVersionedSerializer<StatusRequest>()
{
public void serialize(StatusRequest msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
index 99eb76b..e62d337 100644
--- a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.consistent.ConsistentSession;
@@ -34,7 +35,7 @@ public class StatusResponse extends RepairMessage
public StatusResponse(UUID sessionID, ConsistentSession.State state)
{
- super(Type.STATUS_RESPONSE, null);
+ super(null);
assert sessionID != null;
assert state != null;
this.sessionID = sessionID;
@@ -67,7 +68,7 @@ public class StatusResponse extends RepairMessage
'}';
}
- public static final MessageSerializer serializer = new MessageSerializer<StatusResponse>()
+ public static final IVersionedSerializer<StatusResponse> serializer = new IVersionedSerializer<StatusResponse>()
{
public void serialize(StatusResponse msg, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 71fcdb0..341455f 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -44,8 +45,6 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
*/
public class SyncRequest extends RepairMessage
{
- public static MessageSerializer serializer = new SyncRequestSerializer();
-
public final InetAddressAndPort initiator;
public final InetAddressAndPort src;
public final InetAddressAndPort dst;
@@ -54,7 +53,7 @@ public class SyncRequest extends RepairMessage
public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
{
- super(Type.SYNC_REQUEST, desc);
+ super(desc);
this.initiator = initiator;
this.src = src;
this.dst = dst;
@@ -68,8 +67,7 @@ public class SyncRequest extends RepairMessage
if (!(o instanceof SyncRequest))
return false;
SyncRequest req = (SyncRequest)o;
- return messageType == req.messageType &&
- desc.equals(req.desc) &&
+ return desc.equals(req.desc) &&
initiator.equals(req.initiator) &&
src.equals(req.src) &&
dst.equals(req.dst) &&
@@ -80,10 +78,10 @@ public class SyncRequest extends RepairMessage
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc, initiator, src, dst, ranges, previewKind);
+ return Objects.hash(desc, initiator, src, dst, ranges, previewKind);
}
- public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
+ public static final IVersionedSerializer<SyncRequest> serializer = new IVersionedSerializer<SyncRequest>()
{
public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException
{
@@ -124,7 +122,7 @@ public class SyncRequest extends RepairMessage
size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
return size;
}
- }
+ };
@Override
public String toString()
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
similarity index 79%
rename from src/java/org/apache/cassandra/repair/messages/SyncComplete.java
rename to src/java/org/apache/cassandra/repair/messages/SyncResponse.java
index c51d1fd..e7e7985 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -34,10 +35,8 @@ import org.apache.cassandra.streaming.SessionSummary;
*
* @since 2.0
*/
-public class SyncComplete extends RepairMessage
+public class SyncResponse extends RepairMessage
{
- public static final MessageSerializer serializer = new SyncCompleteSerializer();
-
/** nodes that involved in this sync */
public final SyncNodePair nodes;
/** true if sync success, false otherwise */
@@ -45,17 +44,17 @@ public class SyncComplete extends RepairMessage
public final List<SessionSummary> summaries;
- public SyncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
+ public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
{
- super(Type.SYNC_COMPLETE, desc);
+ super(desc);
this.nodes = nodes;
this.success = success;
this.summaries = summaries;
}
- public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
+ public SyncResponse(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
{
- super(Type.SYNC_COMPLETE, desc);
+ super(desc);
this.summaries = summaries;
this.nodes = new SyncNodePair(endpoint1, endpoint2);
this.success = success;
@@ -64,11 +63,10 @@ public class SyncComplete extends RepairMessage
@Override
public boolean equals(Object o)
{
- if (!(o instanceof SyncComplete))
+ if (!(o instanceof SyncResponse))
return false;
- SyncComplete other = (SyncComplete)o;
- return messageType == other.messageType &&
- desc.equals(other.desc) &&
+ SyncResponse other = (SyncResponse)o;
+ return desc.equals(other.desc) &&
success == other.success &&
nodes.equals(other.nodes) &&
summaries.equals(other.summaries);
@@ -77,12 +75,12 @@ public class SyncComplete extends RepairMessage
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc, success, nodes, summaries);
+ return Objects.hash(desc, success, nodes, summaries);
}
- private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
+ public static final IVersionedSerializer<SyncResponse> serializer = new IVersionedSerializer<SyncResponse>()
{
- public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException
+ public void serialize(SyncResponse message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
SyncNodePair.serializer.serialize(message.nodes, out, version);
@@ -95,7 +93,7 @@ public class SyncComplete extends RepairMessage
}
}
- public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
+ public SyncResponse deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version);
@@ -108,10 +106,10 @@ public class SyncComplete extends RepairMessage
summaries.add(SessionSummary.serializer.deserialize(in, version));
}
- return new SyncComplete(desc, nodes, success, summaries);
+ return new SyncResponse(desc, nodes, success, summaries);
}
- public long serializedSize(SyncComplete message, int version)
+ public long serializedSize(SyncResponse message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
size += SyncNodePair.serializer.serializedSize(message.nodes, version);
@@ -125,5 +123,5 @@ public class SyncComplete extends RepairMessage
return size;
}
- }
+ };
}
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index 6466244..f9a1f4e 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.RepairJobDesc;
@@ -31,13 +32,11 @@ import org.apache.cassandra.repair.RepairJobDesc;
*/
public class ValidationRequest extends RepairMessage
{
- public static MessageSerializer serializer = new ValidationRequestSerializer();
-
public final int nowInSec;
public ValidationRequest(RepairJobDesc desc, int nowInSec)
{
- super(Type.VALIDATION_REQUEST, desc);
+ super(desc);
this.nowInSec = nowInSec;
}
@@ -65,7 +64,7 @@ public class ValidationRequest extends RepairMessage
return nowInSec;
}
- public static class ValidationRequestSerializer implements MessageSerializer<ValidationRequest>
+ public static final IVersionedSerializer<ValidationRequest> serializer = new IVersionedSerializer<ValidationRequest>()
{
public void serialize(ValidationRequest message, DataOutputPlus out, int version) throws IOException
{
@@ -85,5 +84,5 @@ public class ValidationRequest extends RepairMessage
size += TypeSizes.sizeof(message.nowInSec);
return size;
}
- }
+ };
}
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
similarity index 70%
rename from src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
rename to src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
index b8aa736..d9f4467 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.RepairJobDesc;
@@ -31,22 +32,20 @@ import org.apache.cassandra.utils.MerkleTrees;
*
* @since 2.0
*/
-public class ValidationComplete extends RepairMessage
+public class ValidationResponse extends RepairMessage
{
- public static MessageSerializer serializer = new ValidationCompleteSerializer();
-
/** Merkle hash tree response. Null if validation failed. */
public final MerkleTrees trees;
- public ValidationComplete(RepairJobDesc desc)
+ public ValidationResponse(RepairJobDesc desc)
{
- super(Type.VALIDATION_COMPLETE, desc);
+ super(desc);
trees = null;
}
- public ValidationComplete(RepairJobDesc desc, MerkleTrees trees)
+ public ValidationResponse(RepairJobDesc desc, MerkleTrees trees)
{
- super(Type.VALIDATION_COMPLETE, desc);
+ super(desc);
assert trees != null;
this.trees = trees;
}
@@ -57,34 +56,33 @@ public class ValidationComplete extends RepairMessage
}
/**
- * @return a new {@link ValidationComplete} instance with all trees moved off heap, or {@code this}
+ * @return a new {@link ValidationResponse} instance with all trees moved off heap, or {@code this}
* if it's a failure response.
*/
- public ValidationComplete tryMoveOffHeap() throws IOException
+ public ValidationResponse tryMoveOffHeap() throws IOException
{
- return trees == null ? this : new ValidationComplete(desc, trees.tryMoveOffHeap());
+ return trees == null ? this : new ValidationResponse(desc, trees.tryMoveOffHeap());
}
@Override
public boolean equals(Object o)
{
- if (!(o instanceof ValidationComplete))
+ if (!(o instanceof ValidationResponse))
return false;
- ValidationComplete other = (ValidationComplete)o;
- return messageType == other.messageType &&
- desc.equals(other.desc);
+ ValidationResponse other = (ValidationResponse)o;
+ return desc.equals(other.desc);
}
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc);
+ return Objects.hash(desc);
}
- private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
+ public static final IVersionedSerializer<ValidationResponse> serializer = new IVersionedSerializer<ValidationResponse>()
{
- public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException
+ public void serialize(ValidationResponse message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
out.writeBoolean(message.success());
@@ -92,7 +90,7 @@ public class ValidationComplete extends RepairMessage
MerkleTrees.serializer.serialize(message.trees, out, version);
}
- public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException
+ public ValidationResponse deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
boolean success = in.readBoolean();
@@ -100,13 +98,13 @@ public class ValidationComplete extends RepairMessage
if (success)
{
MerkleTrees trees = MerkleTrees.serializer.deserialize(in, version);
- return new ValidationComplete(desc, trees);
+ return new ValidationResponse(desc, trees);
}
- return new ValidationComplete(desc);
+ return new ValidationResponse(desc);
}
- public long serializedSize(ValidationComplete message, int version)
+ public long serializedSize(ValidationResponse message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
size += TypeSizes.sizeof(message.success());
@@ -114,5 +112,5 @@ public class ValidationComplete extends RepairMessage
size += MerkleTrees.serializer.serializedSize(message.trees, version);
return size;
}
- }
+ };
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 409d799..6f4c474 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -78,7 +78,7 @@ import org.apache.cassandra.utils.UUIDGen;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.PREPARE_MSG;
/**
* ActiveRepairService is the starting point for manual "active" repairs.
@@ -112,8 +112,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
private boolean registeredForEndpointChanges = false;
- public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
-
private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
// singleton enforcement
public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
@@ -437,7 +435,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (FailureDetector.instance.isAlive(neighbour))
{
PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
- Message<RepairMessage> msg = Message.out(REPAIR_REQ, message);
+ Message<RepairMessage> msg = Message.out(PREPARE_MSG, message);
MessagingService.instance().sendWithCallback(msg, neighbour, callback);
}
else
@@ -527,21 +525,21 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return parentRepairSessions.remove(parentSessionId);
}
- public void handleMessage(InetAddressAndPort endpoint, RepairMessage message)
+ public void handleMessage(Message<? extends RepairMessage> message)
{
- RepairJobDesc desc = message.desc;
+ RepairJobDesc desc = message.payload.desc;
RepairSession session = sessions.get(desc.sessionId);
if (session == null)
return;
- switch (message.messageType)
+ switch (message.verb())
{
- case VALIDATION_COMPLETE:
- ValidationComplete validation = (ValidationComplete) message;
- session.validationComplete(desc, endpoint, validation.trees);
+ case VALIDATION_RSP:
+ ValidationResponse validation = (ValidationResponse) message.payload;
+ session.validationComplete(desc, message.from(), validation.trees);
break;
- case SYNC_COMPLETE:
+ case SYNC_RSP:
// one of replica is synced.
- SyncComplete sync = (SyncComplete) message;
+ SyncResponse sync = (SyncResponse) message.payload;
session.syncComplete(desc, sync.nodes, sync.success, sync.summaries);
break;
default:
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
index 849faf2..4e8caa6 100644
Binary files a/test/data/serialization/4.0/service.SyncComplete.bin and b/test/data/serialization/4.0/service.SyncComplete.bin differ
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index f5f2e3d..b0cc44e 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
index 3f5b318..7402c9e 100644
Binary files a/test/data/serialization/4.0/service.ValidationComplete.bin and b/test/data/serialization/4.0/service.ValidationComplete.bin differ
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
index 72d4a1f..fa4a913 100644
Binary files a/test/data/serialization/4.0/service.ValidationRequest.bin and b/test/data/serialization/4.0/service.ValidationRequest.bin differ
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index b84adaa..068544d 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -181,14 +182,14 @@ public class RepairJobTest
assertEquals(0, result.stats.size());
// RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
- List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+ List<Verb> expectedTypes = new ArrayList<>();
for (int i = 0; i < 3; i++)
- expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+ expectedTypes.add(Verb.SNAPSHOT_MSG);
for (int i = 0; i < 3; i++)
- expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+ expectedTypes.add(Verb.VALIDATION_REQ);
assertEquals(expectedTypes, observedMessages.stream()
- .map(k -> ((RepairMessage) k.payload).messageType)
+ .map(Message::verb)
.collect(Collectors.toList()));
}
@@ -251,7 +252,7 @@ public class RepairJobTest
assertTrue(results.stream().allMatch(s -> s.numberOfDifferences == 1));
assertEquals(2, messages.size());
- assertTrue(messages.stream().allMatch(m -> ((RepairMessage) m.payload).messageType == RepairMessage.Type.SYNC_REQUEST));
+ assertTrue(messages.stream().allMatch(m -> m.verb() == Verb.SYNC_REQ));
}
@Test
@@ -800,17 +801,16 @@ public class RepairJobTest
messageCapture.add(message);
}
- RepairMessage rm = (RepairMessage) message.payload;
- switch (rm.messageType)
+ switch (message.verb())
{
- case SNAPSHOT:
+ case SNAPSHOT_MSG:
MessagingService.instance().callbacks.removeAndRespond(message.id(), to, message.emptyResponse());
break;
- case VALIDATION_REQUEST:
+ case VALIDATION_REQ:
session.validationComplete(sessionJobDesc, to, mockTrees.get(to));
break;
- case SYNC_REQUEST:
- SyncRequest syncRequest = (SyncRequest) rm;
+ case SYNC_REQ:
+ SyncRequest syncRequest = (SyncRequest) message.payload;
session.syncComplete(sessionJobDesc, new SyncNodePair(syncRequest.src, syncRequest.dst),
true, Collections.emptyList());
break;
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
index 7f48788..cba64ae 100644
--- a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -48,7 +48,7 @@ public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
InetAddressAndPort sentTo = null;
@Override
- void sendRequest(RepairMessage request, InetAddressAndPort to)
+ void sendRequest(SyncRequest request, InetAddressAndPort to)
{
Assert.assertNull(sentMessage);
Assert.assertNotNull(request);
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 9e848a9..a288edb 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -51,8 +51,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
@@ -131,12 +130,11 @@ public class ValidatorTest
assertNotNull(tree.hash(new Range<>(min, min)));
Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- assertEquals(Verb.REPAIR_REQ, message.verb());
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(Verb.VALIDATION_RSP, message.verb());
+ ValidationResponse m = (ValidationResponse) message.payload;
assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success());
- assertNotNull(((ValidationComplete) m).trees);
+ assertTrue(m.success());
+ assertNotNull(m.trees);
}
@@ -154,12 +152,11 @@ public class ValidatorTest
validator.fail();
Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- assertEquals(Verb.REPAIR_REQ, message.verb());
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(Verb.VALIDATION_RSP, message.verb());
+ ValidationResponse m = (ValidationResponse) message.payload;
assertEquals(desc, m.desc);
- assertFalse(((ValidationComplete) m).success());
- assertNull(((ValidationComplete) m).trees);
+ assertFalse(m.success());
+ assertNull(m.trees);
}
@Test
@@ -214,19 +211,17 @@ public class ValidatorTest
ValidationManager.instance.submitValidation(cfs, validator);
Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- assertEquals(Verb.REPAIR_REQ, message.verb());
- RepairMessage m = (RepairMessage) message.payload;
- assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+ assertEquals(Verb.VALIDATION_RSP, message.verb());
+ ValidationResponse m = (ValidationResponse) message.payload;
assertEquals(desc, m.desc);
- assertTrue(((ValidationComplete) m).success());
- MerkleTrees trees = ((ValidationComplete) m).trees;
+ assertTrue(m.success());
- Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
+ Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = m.trees.iterator();
while (iterator.hasNext())
{
assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
}
- assertEquals(trees.rowCount(), n);
+ assertEquals(m.trees.rowCount(), n);
}
/*
@@ -273,7 +268,7 @@ public class ValidatorTest
ValidationManager.instance.submitValidation(cfs, validator);
Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- MerkleTrees trees = ((ValidationComplete) message.payload).trees;
+ MerkleTrees trees = ((ValidationResponse) message.payload).trees;
Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
int numTrees = 0;
@@ -335,7 +330,7 @@ public class ValidatorTest
ValidationManager.instance.submitValidation(cfs, validator);
Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- MerkleTrees trees = ((ValidationComplete) message.payload).trees;
+ MerkleTrees trees = ((ValidationResponse) message.payload).trees;
// Should have 4 trees each with a depth of on average 10 (since each range should have gotten 0.25 megabytes)
Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index 237e9a8..c9fd913 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import static org.apache.cassandra.net.MockMessagingService.all;
-import static org.apache.cassandra.net.MockMessagingService.payload;
import static org.apache.cassandra.net.MockMessagingService.to;
import static org.apache.cassandra.net.MockMessagingService.verb;
import static org.junit.Assert.fail;
@@ -275,45 +274,36 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
Collection<InetAddressAndPort> timeout,
Function<PrepareConsistentRequest, UUID> sessionIdFunc)
{
- return MockMessagingService.when(
- all(verb(Verb.REPAIR_REQ),
- payload((p) -> p instanceof PrepareConsistentRequest))
- ).respond((msgOut, to) ->
- {
- if(timeout.contains(to)) return null;
- else return Message.out(Verb.REPAIR_REQ, new PrepareConsistentResponse(sessionIdFunc.apply((PrepareConsistentRequest) msgOut.payload), to, !failed.contains(to)));
- });
+ return MockMessagingService.when(verb(Verb.PREPARE_CONSISTENT_REQ)).respond((msgOut, to) ->
+ {
+ if (timeout.contains(to))
+ return null;
+
+ return Message.out(Verb.PREPARE_CONSISTENT_RSP,
+ new PrepareConsistentResponse(sessionIdFunc.apply((PrepareConsistentRequest) msgOut.payload), to, !failed.contains(to)));
+ });
}
private MockMessagingSpy createFinalizeSpy(Collection<InetAddressAndPort> failed,
Collection<InetAddressAndPort> timeout)
{
- return MockMessagingService.when(
- all(verb(Verb.REPAIR_REQ),
- payload((p) -> p instanceof FinalizePropose))
- ).respond((msgOut, to) ->
- {
- if(timeout.contains(to)) return null;
- else return Message.out(Verb.REPAIR_REQ,
- new FinalizePromise(((FinalizePropose) msgOut.payload).sessionID, to, !failed.contains(to)));
- });
+ return MockMessagingService.when(verb(Verb.FINALIZE_PROPOSE_MSG)).respond((msgOut, to) ->
+ {
+ if (timeout.contains(to))
+ return null;
+
+ return Message.out(Verb.FINALIZE_PROMISE_MSG, new FinalizePromise(((FinalizePropose) msgOut.payload).sessionID, to, !failed.contains(to)));
+ });
}
private MockMessagingSpy createCommitSpy()
{
- return MockMessagingService.when(
- all(verb(Verb.REPAIR_REQ),
- payload((p) -> p instanceof FinalizeCommit))
- ).dontReply();
+ return MockMessagingService.when(verb(Verb.FINALIZE_COMMIT_MSG)).dontReply();
}
private MockMessagingSpy createFailSessionSpy(Collection<InetAddressAndPort> participants)
{
- return MockMessagingService.when(
- all(verb(Verb.REPAIR_REQ),
- payload((p) -> p instanceof FailSession),
- to(participants::contains))
- ).dontReply();
+ return MockMessagingService.when(all(verb(Verb.FAILED_SESSION_MSG), to(participants::contains))).dontReply();
}
private static RepairSessionResult createResult(CoordinatorSession coordinator)
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index c6980fe..1cee312 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.messages.FailSession;
@@ -93,13 +94,13 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
- protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, Message<RepairMessage> message)
{
if (!sentMessages.containsKey(destination))
{
sentMessages.put(destination, new ArrayList<>());
}
- sentMessages.get(destination).add(message);
+ sentMessages.get(destination).add(message.payload);
}
Runnable onSetRepairing = null;
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index a6b4fe2..15fd1fc 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.KeyspaceRepairManager;
@@ -123,13 +124,14 @@ public class LocalSessionTest extends AbstractRepairTest
static class InstrumentedLocalSessions extends LocalSessions
{
Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
- protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+
+ protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message)
{
if (!sentMessages.containsKey(destination))
{
sentMessages.put(destination, new ArrayList<>());
}
- sentMessages.get(destination).add(message);
+ sentMessages.get(destination).add(message.payload);
}
SettableFuture<Object> prepareSessionFuture = null;
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index d583d85..fa037a0 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -118,7 +118,7 @@ public class RepairMessageSerializationsTest
@Test
public void validationCompleteMessage_NoMerkleTree() throws IOException
{
- ValidationComplete deserialized = validationCompleteMessage(null);
+ ValidationResponse deserialized = validationCompleteMessage(null);
Assert.assertNull(deserialized.trees);
}
@@ -127,19 +127,19 @@ public class RepairMessageSerializationsTest
{
MerkleTrees trees = new MerkleTrees(Murmur3Partitioner.instance);
trees.addMerkleTree(256, new Range<>(new LongToken(1000), new LongToken(1001)));
- ValidationComplete deserialized = validationCompleteMessage(trees);
+ ValidationResponse deserialized = validationCompleteMessage(trees);
// a simple check to make sure we got some merkle trees back.
Assert.assertEquals(trees.size(), deserialized.trees.size());
}
- private ValidationComplete validationCompleteMessage(MerkleTrees trees) throws IOException
+ private ValidationResponse validationCompleteMessage(MerkleTrees trees) throws IOException
{
RepairJobDesc jobDesc = buildRepairJobDesc();
- ValidationComplete msg = trees == null ?
- new ValidationComplete(jobDesc) :
- new ValidationComplete(jobDesc, trees);
- ValidationComplete deserialized = serializeRoundTrip(msg, ValidationComplete.serializer);
+ ValidationResponse msg = trees == null ?
+ new ValidationResponse(jobDesc) :
+ new ValidationResponse(jobDesc, trees);
+ ValidationResponse deserialized = serializeRoundTrip(msg, ValidationResponse.serializer);
return deserialized;
}
@@ -164,8 +164,8 @@ public class RepairMessageSerializationsTest
Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)),
Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10))
));
- SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries);
- serializeRoundTrip(msg, SyncComplete.serializer);
+ SyncResponse msg = new SyncResponse(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries);
+ serializeRoundTrip(msg, SyncResponse.serializer);
}
@Test
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
index d876139..fedf498 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,16 +38,16 @@ public class RepairMessageSerializerTest
{
private static int MS_VERSION = MessagingService.current_version;
- static RepairMessage serdes(RepairMessage message)
+ private static <T extends RepairMessage> T serdes(IVersionedSerializer<T> serializer, T message)
{
- int expectedSize = (int) RepairMessage.serializer.serializedSize(message, MS_VERSION);
+ int expectedSize = (int) serializer.serializedSize(message, MS_VERSION);
try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
{
- RepairMessage.serializer.serialize(message, out, MS_VERSION);
+ serializer.serialize(message, out, MS_VERSION);
Assert.assertEquals(expectedSize, out.buffer().limit());
try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
{
- return RepairMessage.serializer.deserialize(in, MS_VERSION);
+ return serializer.deserialize(in, MS_VERSION);
}
}
catch (IOException e)
@@ -62,54 +63,50 @@ public class RepairMessageSerializerTest
InetAddressAndPort peer1 = InetAddressAndPort.getByName("10.0.0.2");
InetAddressAndPort peer2 = InetAddressAndPort.getByName("10.0.0.3");
InetAddressAndPort peer3 = InetAddressAndPort.getByName("10.0.0.4");
- RepairMessage expected = new PrepareConsistentRequest(UUIDGen.getTimeUUID(),
- coordinator,
- Sets.newHashSet(peer1, peer2, peer3));
- RepairMessage actual = serdes(expected);
+ PrepareConsistentRequest expected =
+ new PrepareConsistentRequest(UUIDGen.getTimeUUID(), coordinator, Sets.newHashSet(peer1, peer2, peer3));
+ PrepareConsistentRequest actual = serdes(PrepareConsistentRequest.serializer, expected);
Assert.assertEquals(expected, actual);
}
@Test
public void prepareConsistentResponse() throws Exception
{
- RepairMessage expected = new PrepareConsistentResponse(UUIDGen.getTimeUUID(),
- InetAddressAndPort.getByName("10.0.0.2"),
- true);
- RepairMessage actual = serdes(expected);
+ PrepareConsistentResponse expected =
+ new PrepareConsistentResponse(UUIDGen.getTimeUUID(), InetAddressAndPort.getByName("10.0.0.2"), true);
+ PrepareConsistentResponse actual = serdes(PrepareConsistentResponse.serializer, expected);
Assert.assertEquals(expected, actual);
}
@Test
public void failSession() throws Exception
{
- RepairMessage expected = new FailSession(UUIDGen.getTimeUUID());
- RepairMessage actual = serdes(expected);
+ FailSession expected = new FailSession(UUIDGen.getTimeUUID());
+ FailSession actual = serdes(FailSession.serializer, expected);
Assert.assertEquals(expected, actual);;
}
@Test
public void finalizeCommit() throws Exception
{
- RepairMessage expected = new FinalizeCommit(UUIDGen.getTimeUUID());
- RepairMessage actual = serdes(expected);
+ FinalizeCommit expected = new FinalizeCommit(UUIDGen.getTimeUUID());
+ FinalizeCommit actual = serdes(FinalizeCommit.serializer, expected);
Assert.assertEquals(expected, actual);;
}
@Test
public void finalizePromise() throws Exception
{
- RepairMessage expected = new FinalizePromise(UUIDGen.getTimeUUID(),
- InetAddressAndPort.getByName("10.0.0.2"),
- true);
- RepairMessage actual = serdes(expected);
+ FinalizePromise expected = new FinalizePromise(UUIDGen.getTimeUUID(), InetAddressAndPort.getByName("10.0.0.2"), true);
+ FinalizePromise actual = serdes(FinalizePromise.serializer, expected);
Assert.assertEquals(expected, actual);
}
@Test
public void finalizePropose() throws Exception
{
- RepairMessage expected = new FinalizePropose(UUIDGen.getTimeUUID());
- RepairMessage actual = serdes(expected);
+ FinalizePropose expected = new FinalizePropose(UUIDGen.getTimeUUID());
+ FinalizePropose actual = serdes(FinalizePropose.serializer, expected);
Assert.assertEquals(expected, actual);;
}
}
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 273d233..0a5a023 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -38,10 +38,10 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.SyncNodePair;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
@@ -79,14 +79,14 @@ public class SerializationsTest extends AbstractSerializationsTester
partitionerSwitcher.close();
}
- private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
+ private <T extends RepairMessage> void testRepairMessageWrite(String fileName, IVersionedSerializer<T> serializer, T... messages) throws IOException
{
try (DataOutputStreamPlus out = getOutput(fileName))
{
- for (RepairMessage message : messages)
+ for (T message : messages)
{
- testSerializedSize(message, RepairMessage.serializer);
- RepairMessage.serializer.serialize(message, out, getVersion());
+ testSerializedSize(message, serializer);
+ serializer.serialize(message, out, getVersion());
}
}
}
@@ -94,7 +94,7 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testValidationRequestWrite() throws IOException
{
ValidationRequest message = new ValidationRequest(DESC, 1234);
- testRepairMessageWrite("service.ValidationRequest.bin", message);
+ testRepairMessageWrite("service.ValidationRequest.bin", ValidationRequest.serializer, message);
}
@Test
@@ -105,10 +105,9 @@ public class SerializationsTest extends AbstractSerializationsTester
try (DataInputStreamPlus in = getInput("service.ValidationRequest.bin"))
{
- RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST;
+ ValidationRequest message = ValidationRequest.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert ((ValidationRequest) message).nowInSec == 1234;
+ assert message.nowInSec == 1234;
}
}
@@ -121,7 +120,7 @@ public class SerializationsTest extends AbstractSerializationsTester
// empty validation
mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE);
Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddressAndPort(), -1, PreviewKind.NONE);
- ValidationComplete c0 = new ValidationComplete(DESC, mt);
+ ValidationResponse c0 = new ValidationResponse(DESC, mt);
// validation with a tree
mt = new MerkleTrees(p);
@@ -129,12 +128,12 @@ public class SerializationsTest extends AbstractSerializationsTester
for (int i = 0; i < 10; i++)
mt.split(p.getRandomToken());
Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddressAndPort(), -1, PreviewKind.NONE);
- ValidationComplete c1 = new ValidationComplete(DESC, mt);
+ ValidationResponse c1 = new ValidationResponse(DESC, mt);
// validation failed
- ValidationComplete c3 = new ValidationComplete(DESC);
+ ValidationResponse c3 = new ValidationResponse(DESC);
- testRepairMessageWrite("service.ValidationComplete.bin", c0, c1, c3);
+ testRepairMessageWrite("service.ValidationComplete.bin", ValidationResponse.serializer, c0, c1, c3);
}
@Test
@@ -146,28 +145,25 @@ public class SerializationsTest extends AbstractSerializationsTester
try (DataInputStreamPlus in = getInput("service.ValidationComplete.bin"))
{
// empty validation
- RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+ ValidationResponse message = ValidationResponse.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert ((ValidationComplete) message).success();
- assert ((ValidationComplete) message).trees != null;
+ assert message.success();
+ assert message.trees != null;
// validation with a tree
- message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+ message = ValidationResponse.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert ((ValidationComplete) message).success();
- assert ((ValidationComplete) message).trees != null;
+ assert message.success();
+ assert message.trees != null;
// failed validation
- message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+ message = ValidationResponse.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert !((ValidationComplete) message).success();
- assert ((ValidationComplete) message).trees == null;
+ assert !message.success();
+ assert message.trees == null;
}
}
@@ -178,7 +174,7 @@ public class SerializationsTest extends AbstractSerializationsTester
InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT);
SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE);
- testRepairMessageWrite("service.SyncRequest.bin", message);
+ testRepairMessageWrite("service.SyncRequest.bin", SyncRequest.serializer, message);
}
@Test
@@ -193,13 +189,12 @@ public class SerializationsTest extends AbstractSerializationsTester
try (DataInputStreamPlus in = getInput("service.SyncRequest.bin"))
{
- RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.SYNC_REQUEST;
+ SyncRequest message = SyncRequest.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert local.equals(((SyncRequest) message).initiator);
- assert src.equals(((SyncRequest) message).src);
- assert dest.equals(((SyncRequest) message).dst);
- assert ((SyncRequest) message).ranges.size() == 1 && ((SyncRequest) message).ranges.contains(FULL_RANGE);
+ assert local.equals(message.initiator);
+ assert src.equals(message.src);
+ assert dest.equals(message.dst);
+ assert message.ranges.size() == 1 && message.ranges.contains(FULL_RANGE);
}
}
@@ -213,11 +208,11 @@ public class SerializationsTest extends AbstractSerializationsTester
Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)),
Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10))
));
- SyncComplete success = new SyncComplete(DESC, src, dest, true, summaries);
+ SyncResponse success = new SyncResponse(DESC, src, dest, true, summaries);
// sync fail
- SyncComplete fail = new SyncComplete(DESC, src, dest, false, Collections.emptyList());
+ SyncResponse fail = new SyncResponse(DESC, src, dest, false, Collections.emptyList());
- testRepairMessageWrite("service.SyncComplete.bin", success, fail);
+ testRepairMessageWrite("service.SyncComplete.bin", SyncResponse.serializer, success, fail);
}
@Test
@@ -233,22 +228,20 @@ public class SerializationsTest extends AbstractSerializationsTester
try (DataInputStreamPlus in = getInput("service.SyncComplete.bin"))
{
// success
- RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+ SyncResponse message = SyncResponse.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
System.out.println(nodes);
- System.out.println(((SyncComplete) message).nodes);
- assert nodes.equals(((SyncComplete) message).nodes);
- assert ((SyncComplete) message).success;
+ System.out.println(message.nodes);
+ assert nodes.equals(message.nodes);
+ assert message.success;
// fail
- message = RepairMessage.serializer.deserialize(in, getVersion());
- assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+ message = SyncResponse.serializer.deserialize(in, getVersion());
assert DESC.equals(message.desc);
- assert nodes.equals(((SyncComplete) message).nodes);
- assert !((SyncComplete) message).success;
+ assert nodes.equals(message.nodes);
+ assert !message.success;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org