You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2019/09/20 22:52:48 UTC

[cassandra] branch trunk updated: Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9ff884  Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly
f9ff884 is described below

commit f9ff88437742675db5c53f5834884b43f8937e00
Author: Aleksey Yeschenko <al...@apache.org>
AuthorDate: Fri Jun 14 14:49:46 2019 +0100

    Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly
    
    patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
    CASSANDRA-15163
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/net/Verb.java        | 152 +++++++++++++--------
 .../cassandra/repair/AsymmetricRemoteSyncTask.java |   4 +-
 .../cassandra/repair/RepairMessageVerbHandler.java |  36 ++---
 .../org/apache/cassandra/repair/SnapshotTask.java  |   4 +-
 .../cassandra/repair/StreamingRepairTask.java      |  10 +-
 .../cassandra/repair/SymmetricRemoteSyncTask.java  |   6 +-
 .../apache/cassandra/repair/ValidationTask.java    |   4 +-
 .../org/apache/cassandra/repair/Validator.java     |  16 +--
 .../repair/consistent/CoordinatorSession.java      |  16 +--
 .../cassandra/repair/consistent/LocalSessions.java |  46 ++++---
 .../repair/messages/AsymmetricSyncRequest.java     |  14 +-
 .../cassandra/repair/messages/CleanupMessage.java  |  13 +-
 .../cassandra/repair/messages/FailSession.java     |   5 +-
 .../cassandra/repair/messages/FinalizeCommit.java  |   5 +-
 .../cassandra/repair/messages/FinalizePromise.java |   5 +-
 .../cassandra/repair/messages/FinalizePropose.java |   5 +-
 .../repair/messages/PrepareConsistentRequest.java  |   6 +-
 .../repair/messages/PrepareConsistentResponse.java |   5 +-
 .../cassandra/repair/messages/PrepareMessage.java  |  24 +++-
 .../cassandra/repair/messages/RepairMessage.java   |  85 +-----------
 .../cassandra/repair/messages/SnapshotMessage.java |  13 +-
 .../cassandra/repair/messages/StatusRequest.java   |   5 +-
 .../cassandra/repair/messages/StatusResponse.java  |   5 +-
 .../cassandra/repair/messages/SyncRequest.java     |  14 +-
 .../{SyncComplete.java => SyncResponse.java}       |  34 +++--
 .../repair/messages/ValidationRequest.java         |   9 +-
 ...dationComplete.java => ValidationResponse.java} |  42 +++---
 .../cassandra/service/ActiveRepairService.java     |  22 ++-
 .../serialization/4.0/service.SyncComplete.bin     | Bin 258 -> 256 bytes
 .../data/serialization/4.0/service.SyncRequest.bin | Bin 111 -> 110 bytes
 .../4.0/service.ValidationComplete.bin             | Bin 600 -> 597 bytes
 .../4.0/service.ValidationRequest.bin              | Bin 75 -> 74 bytes
 .../org/apache/cassandra/repair/RepairJobTest.java |  22 +--
 .../repair/SymmetricRemoteSyncTaskTest.java        |   2 +-
 .../org/apache/cassandra/repair/ValidatorTest.java |  37 +++--
 .../consistent/CoordinatorMessagingTest.java       |  44 +++---
 .../repair/consistent/CoordinatorSessionTest.java  |   5 +-
 .../repair/consistent/LocalSessionTest.java        |   6 +-
 .../messages/RepairMessageSerializationsTest.java  |  18 +--
 .../messages/RepairMessageSerializerTest.java      |  41 +++---
 .../cassandra/service/SerializationsTest.java      |  81 +++++------
 42 files changed, 400 insertions(+), 462 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2929a98..1a3df81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha2
+ * Untangle RepairMessage sub-hierarchy of messages, use new messaging (more) correctly (CASSANDRA-15163)
  * Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260)
  * Add Alibaba Cloud Platform snitch (CASSANDRA-15092)
 
diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java
index 6463a5a..67d847e 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -56,7 +56,22 @@ import org.apache.cassandra.hints.HintMessage;
 import org.apache.cassandra.hints.HintVerbHandler;
 import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
 import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.CleanupMessage;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.repair.messages.PrepareMessage;
+import org.apache.cassandra.repair.messages.SnapshotMessage;
+import org.apache.cassandra.repair.messages.StatusRequest;
+import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.repair.messages.SyncResponse;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.repair.messages.ValidationResponse;
+import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.schema.SchemaPullVerbHandler;
 import org.apache.cassandra.schema.SchemaPushVerbHandler;
 import org.apache.cassandra.schema.SchemaVersionVerbHandler;
@@ -84,75 +99,92 @@ import static org.apache.cassandra.schema.MigrationManager.MigrationsSerializer;
  */
 public enum Verb
 {
-    MUTATION_RSP         (60, P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    MUTATION_REQ         (0,  P3, writeTimeout,    MUTATION,          () -> Mutation.serializer,             () -> MutationVerbHandler.instance,        MUTATION_RSP        ),
-    HINT_RSP             (61, P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    HINT_REQ             (1,  P4, writeTimeout,    MUTATION,          () -> HintMessage.serializer,          () -> HintVerbHandler.instance,            HINT_RSP            ),
-    READ_REPAIR_RSP      (62, P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    READ_REPAIR_REQ      (2,  P1, writeTimeout,    MUTATION,          () -> Mutation.serializer,             () -> ReadRepairVerbHandler.instance,      READ_REPAIR_RSP     ),
-    BATCH_STORE_RSP      (65, P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    BATCH_STORE_REQ      (5,  P3, writeTimeout,    MUTATION,          () -> Batch.serializer,                () -> BatchStoreVerbHandler.instance,      BATCH_STORE_RSP     ),
-    BATCH_REMOVE_RSP     (66, P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    BATCH_REMOVE_REQ     (6,  P3, writeTimeout,    MUTATION,          () -> UUIDSerializer.serializer,       () -> BatchRemoveVerbHandler.instance,     BATCH_REMOVE_RSP    ),
-
-    PAXOS_PREPARE_RSP    (93, P2, writeTimeout,    REQUEST_RESPONSE,  () -> PrepareResponse.serializer,      () -> ResponseVerbHandler.instance                             ),
-    PAXOS_PREPARE_REQ    (33, P2, writeTimeout,    MUTATION,          () -> Commit.serializer,               () -> PrepareVerbHandler.instance,         PAXOS_PREPARE_RSP   ),
-    PAXOS_PROPOSE_RSP    (94, P2, writeTimeout,    REQUEST_RESPONSE,  () -> BooleanSerializer.serializer,    () -> ResponseVerbHandler.instance                             ),
-    PAXOS_PROPOSE_REQ    (34, P2, writeTimeout,    MUTATION,          () -> Commit.serializer,               () -> ProposeVerbHandler.instance,         PAXOS_PROPOSE_RSP   ),
-    PAXOS_COMMIT_RSP     (95, P2, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    PAXOS_COMMIT_REQ     (35, P2, writeTimeout,    MUTATION,          () -> Commit.serializer,               () -> CommitVerbHandler.instance,          PAXOS_COMMIT_RSP    ),
-
-    TRUNCATE_RSP         (79, P0, truncateTimeout, REQUEST_RESPONSE,  () -> TruncateResponse.serializer,     () -> ResponseVerbHandler.instance                             ),
-    TRUNCATE_REQ         (19, P0, truncateTimeout, MUTATION,          () -> TruncateRequest.serializer,      () -> TruncateVerbHandler.instance,        TRUNCATE_RSP        ),
-
-    COUNTER_MUTATION_RSP (84, P1, counterTimeout,  REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    COUNTER_MUTATION_REQ (24, P2, counterTimeout,  COUNTER_MUTATION,  () -> CounterMutation.serializer,      () -> CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP),
-
-    READ_RSP             (63, P2, readTimeout,     REQUEST_RESPONSE,  () -> ReadResponse.serializer,         () -> ResponseVerbHandler.instance                             ),
-    READ_REQ             (3,  P3, readTimeout,     READ,              () -> ReadCommand.serializer,          () -> ReadCommandVerbHandler.instance,     READ_RSP            ),
-    RANGE_RSP            (69, P2, rangeTimeout,    REQUEST_RESPONSE,  () -> ReadResponse.serializer,         () -> ResponseVerbHandler.instance                             ),
-    RANGE_REQ            (9,  P3, rangeTimeout,    READ,              () -> ReadCommand.serializer,          () -> ReadCommandVerbHandler.instance,     RANGE_RSP           ),
-
-    GOSSIP_DIGEST_SYN    (14, P0, longTimeout,     GOSSIP,            () -> GossipDigestSyn.serializer,      () -> GossipDigestSynVerbHandler.instance                      ),
-    GOSSIP_DIGEST_ACK    (15, P0, longTimeout,     GOSSIP,            () -> GossipDigestAck.serializer,      () -> GossipDigestAckVerbHandler.instance                      ),
-    GOSSIP_DIGEST_ACK2   (16, P0, longTimeout,     GOSSIP,            () -> GossipDigestAck2.serializer,     () -> GossipDigestAck2VerbHandler.instance                     ),
-    GOSSIP_SHUTDOWN      (29, P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,            () -> GossipShutdownVerbHandler.instance                       ),
-
-    ECHO_RSP             (91, P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    ECHO_REQ             (31, P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,            () -> EchoVerbHandler.instance,            ECHO_RSP            ),
-    PING_RSP             (97, P1, pingTimeout,     GOSSIP,            () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    PING_REQ             (37, P1, pingTimeout,     GOSSIP,            () -> PingRequest.serializer,          () -> PingVerbHandler.instance,            PING_RSP            ),
+    MUTATION_RSP           (60,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    MUTATION_REQ           (0,   P3, writeTimeout,    MUTATION,          () -> Mutation.serializer,                  () -> MutationVerbHandler.instance,        MUTATION_RSP        ),
+    HINT_RSP               (61,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    HINT_REQ               (1,   P4, writeTimeout,    MUTATION,          () -> HintMessage.serializer,               () -> HintVerbHandler.instance,            HINT_RSP            ),
+    READ_REPAIR_RSP        (62,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    READ_REPAIR_REQ        (2,   P1, writeTimeout,    MUTATION,          () -> Mutation.serializer,                  () -> ReadRepairVerbHandler.instance,      READ_REPAIR_RSP     ),
+    BATCH_STORE_RSP        (65,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    BATCH_STORE_REQ        (5,   P3, writeTimeout,    MUTATION,          () -> Batch.serializer,                     () -> BatchStoreVerbHandler.instance,      BATCH_STORE_RSP     ),
+    BATCH_REMOVE_RSP       (66,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    BATCH_REMOVE_REQ       (6,   P3, writeTimeout,    MUTATION,          () -> UUIDSerializer.serializer,            () -> BatchRemoveVerbHandler.instance,     BATCH_REMOVE_RSP    ),
+
+    PAXOS_PREPARE_RSP      (93,  P2, writeTimeout,    REQUEST_RESPONSE,  () -> PrepareResponse.serializer,           () -> ResponseVerbHandler.instance                             ),
+    PAXOS_PREPARE_REQ      (33,  P2, writeTimeout,    MUTATION,          () -> Commit.serializer,                    () -> PrepareVerbHandler.instance,         PAXOS_PREPARE_RSP   ),
+    PAXOS_PROPOSE_RSP      (94,  P2, writeTimeout,    REQUEST_RESPONSE,  () -> BooleanSerializer.serializer,         () -> ResponseVerbHandler.instance                             ),
+    PAXOS_PROPOSE_REQ      (34,  P2, writeTimeout,    MUTATION,          () -> Commit.serializer,                    () -> ProposeVerbHandler.instance,         PAXOS_PROPOSE_RSP   ),
+    PAXOS_COMMIT_RSP       (95,  P2, writeTimeout,    REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    PAXOS_COMMIT_REQ       (35,  P2, writeTimeout,    MUTATION,          () -> Commit.serializer,                    () -> CommitVerbHandler.instance,          PAXOS_COMMIT_RSP    ),
+
+    TRUNCATE_RSP           (79,  P0, truncateTimeout, REQUEST_RESPONSE,  () -> TruncateResponse.serializer,          () -> ResponseVerbHandler.instance                             ),
+    TRUNCATE_REQ           (19,  P0, truncateTimeout, MUTATION,          () -> TruncateRequest.serializer,           () -> TruncateVerbHandler.instance,        TRUNCATE_RSP        ),
+
+    COUNTER_MUTATION_RSP   (84,  P1, counterTimeout,  REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    COUNTER_MUTATION_REQ   (24,  P2, counterTimeout,  COUNTER_MUTATION,  () -> CounterMutation.serializer,           () -> CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP),
+
+    READ_RSP               (63,  P2, readTimeout,     REQUEST_RESPONSE,  () -> ReadResponse.serializer,              () -> ResponseVerbHandler.instance                             ),
+    READ_REQ               (3,   P3, readTimeout,     READ,              () -> ReadCommand.serializer,               () -> ReadCommandVerbHandler.instance,     READ_RSP            ),
+    RANGE_RSP              (69,  P2, rangeTimeout,    REQUEST_RESPONSE,  () -> ReadResponse.serializer,              () -> ResponseVerbHandler.instance                             ),
+    RANGE_REQ              (9,   P3, rangeTimeout,    READ,              () -> ReadCommand.serializer,               () -> ReadCommandVerbHandler.instance,     RANGE_RSP           ),
+
+    GOSSIP_DIGEST_SYN      (14,  P0, longTimeout,     GOSSIP,            () -> GossipDigestSyn.serializer,           () -> GossipDigestSynVerbHandler.instance                      ),
+    GOSSIP_DIGEST_ACK      (15,  P0, longTimeout,     GOSSIP,            () -> GossipDigestAck.serializer,           () -> GossipDigestAckVerbHandler.instance                      ),
+    GOSSIP_DIGEST_ACK2     (16,  P0, longTimeout,     GOSSIP,            () -> GossipDigestAck2.serializer,          () -> GossipDigestAck2VerbHandler.instance                     ),
+    GOSSIP_SHUTDOWN        (29,  P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,                 () -> GossipShutdownVerbHandler.instance                       ),
+
+    ECHO_RSP               (91,  P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    ECHO_REQ               (31,  P0, rpcTimeout,      GOSSIP,            () -> NoPayload.serializer,                 () -> EchoVerbHandler.instance,            ECHO_RSP            ),
+    PING_RSP               (97,  P1, pingTimeout,     GOSSIP,            () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    PING_REQ               (37,  P1, pingTimeout,     GOSSIP,            () -> PingRequest.serializer,               () -> PingVerbHandler.instance,            PING_RSP            ),
 
     // P1 because messages can be arbitrarily large or aren't crucial
-    SCHEMA_PUSH_RSP      (98, P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    SCHEMA_PUSH_REQ      (18, P1, rpcTimeout,      MIGRATION,         () -> MigrationsSerializer.instance,   () -> SchemaPushVerbHandler.instance,      SCHEMA_PUSH_RSP     ),
-    SCHEMA_PULL_RSP      (88, P1, rpcTimeout,      MIGRATION,         () -> MigrationsSerializer.instance,   () -> ResponseVerbHandler.instance                             ),
-    SCHEMA_PULL_REQ      (28, P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,            () -> SchemaPullVerbHandler.instance,      SCHEMA_PULL_RSP     ),
-    SCHEMA_VERSION_RSP   (80, P1, rpcTimeout,      MIGRATION,         () -> UUIDSerializer.serializer,       () -> ResponseVerbHandler.instance                             ),
-    SCHEMA_VERSION_REQ   (20, P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,            () -> SchemaVersionVerbHandler.instance,   SCHEMA_VERSION_RSP  ),
-    REPAIR_RSP           (92, P1, rpcTimeout,      REQUEST_RESPONSE,  () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    REPAIR_REQ           (32, P1, rpcTimeout,      ANTI_ENTROPY,      () -> RepairMessage.serializer,        () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
-
-    REPLICATION_DONE_RSP (82, P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    REPLICATION_DONE_REQ (22, P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,            () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
-    SNAPSHOT_RSP         (87, P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,            () -> ResponseVerbHandler.instance                             ),
-    SNAPSHOT_REQ         (27, P0, rpcTimeout,      MISC,              () -> SnapshotCommand.serializer,      () -> SnapshotVerbHandler.instance,        SNAPSHOT_RSP        ),
+    SCHEMA_PUSH_RSP        (98,  P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    SCHEMA_PUSH_REQ        (18,  P1, rpcTimeout,      MIGRATION,         () -> MigrationsSerializer.instance,        () -> SchemaPushVerbHandler.instance,      SCHEMA_PUSH_RSP     ),
+    SCHEMA_PULL_RSP        (88,  P1, rpcTimeout,      MIGRATION,         () -> MigrationsSerializer.instance,        () -> ResponseVerbHandler.instance                             ),
+    SCHEMA_PULL_REQ        (28,  P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,                 () -> SchemaPullVerbHandler.instance,      SCHEMA_PULL_RSP     ),
+    SCHEMA_VERSION_RSP     (80,  P1, rpcTimeout,      MIGRATION,         () -> UUIDSerializer.serializer,            () -> ResponseVerbHandler.instance                             ),
+    SCHEMA_VERSION_REQ     (20,  P1, rpcTimeout,      MIGRATION,         () -> NoPayload.serializer,                 () -> SchemaVersionVerbHandler.instance,   SCHEMA_VERSION_RSP  ),
+
+    // repair; mostly doesn't use callbacks and sends responses as their own request messages, with matching sessions by uuid; should eventually harmonize and make idiomatic
+    REPAIR_RSP             (100, P1, rpcTimeout,      REQUEST_RESPONSE,  () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    VALIDATION_RSP         (102, P1, rpcTimeout,      ANTI_ENTROPY,      () -> ValidationResponse.serializer,        () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    VALIDATION_REQ         (101, P1, rpcTimeout,      ANTI_ENTROPY,      () -> ValidationRequest.serializer,         () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    SYNC_RSP               (104, P1, rpcTimeout,      ANTI_ENTROPY,      () -> SyncResponse.serializer,              () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    SYNC_REQ               (103, P1, rpcTimeout,      ANTI_ENTROPY,      () -> SyncRequest.serializer,               () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    PREPARE_MSG            (105, P1, rpcTimeout,      ANTI_ENTROPY,      () -> PrepareMessage.serializer,            () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    SNAPSHOT_MSG           (106, P1, rpcTimeout,      ANTI_ENTROPY,      () -> SnapshotMessage.serializer,           () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    CLEANUP_MSG            (107, P1, rpcTimeout,      ANTI_ENTROPY,      () -> CleanupMessage.serializer,            () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout,      ANTI_ENTROPY,      () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout,      ANTI_ENTROPY,      () -> PrepareConsistentRequest.serializer,  () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    FINALIZE_PROPOSE_MSG   (110, P1, rpcTimeout,      ANTI_ENTROPY,      () -> FinalizePropose.serializer,           () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    FINALIZE_PROMISE_MSG   (111, P1, rpcTimeout,      ANTI_ENTROPY,      () -> FinalizePromise.serializer,           () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    FINALIZE_COMMIT_MSG    (112, P1, rpcTimeout,      ANTI_ENTROPY,      () -> FinalizeCommit.serializer,            () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    FAILED_SESSION_MSG     (113, P1, rpcTimeout,      ANTI_ENTROPY,      () -> FailSession.serializer,               () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    STATUS_RSP             (115, P1, rpcTimeout,      ANTI_ENTROPY,      () -> StatusResponse.serializer,            () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    STATUS_REQ             (114, P1, rpcTimeout,      ANTI_ENTROPY,      () -> StatusRequest.serializer,             () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+    ASYMMETRIC_SYNC_REQ    (116, P1, rpcTimeout,      ANTI_ENTROPY,      () -> AsymmetricSyncRequest.serializer,     () -> RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
+
+    REPLICATION_DONE_RSP   (82,  P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    REPLICATION_DONE_REQ   (22,  P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,                 () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
+    SNAPSHOT_RSP           (87,  P0, rpcTimeout,      MISC,              () -> NoPayload.serializer,                 () -> ResponseVerbHandler.instance                             ),
+    SNAPSHOT_REQ           (27,  P0, rpcTimeout,      MISC,              () -> SnapshotCommand.serializer,           () -> SnapshotVerbHandler.instance,        SNAPSHOT_RSP        ),
 
     // generic failure response
-    FAILURE_RSP          (99, P0, noTimeout,       REQUEST_RESPONSE,  () -> RequestFailureReason.serializer, () -> ResponseVerbHandler.instance                             ),
+    FAILURE_RSP            (99,  P0, noTimeout,       REQUEST_RESPONSE,  () -> RequestFailureReason.serializer,      () -> ResponseVerbHandler.instance                             ),
 
     // dummy verbs
-    _TRACE               (30, P1, rpcTimeout,      TRACING,           () -> NoPayload.serializer,            () -> null                                                     ),
-    _SAMPLE              (42, P1, rpcTimeout,      INTERNAL_RESPONSE, () -> NoPayload.serializer,            () -> null                                                     ),
-    _TEST_1              (10, P0, writeTimeout,    IMMEDIATE,         () -> NoPayload.serializer,            () -> null                                                     ),
-    _TEST_2              (11, P1, rpcTimeout,      IMMEDIATE,         () -> NoPayload.serializer,            () -> null                                                     ),
+    _TRACE                 (30,  P1, rpcTimeout,      TRACING,           () -> NoPayload.serializer,                 () -> null                                                     ),
+    _SAMPLE                (42,  P1, rpcTimeout,      INTERNAL_RESPONSE, () -> NoPayload.serializer,                 () -> null                                                     ),
+    _TEST_1                (10,  P0, writeTimeout,    IMMEDIATE,         () -> NoPayload.serializer,                 () -> null                                                     ),
+    _TEST_2                (11,  P1, rpcTimeout,      IMMEDIATE,         () -> NoPayload.serializer,                 () -> null                                                     ),
 
     @Deprecated
-    REQUEST_RSP          (4,  P1, rpcTimeout,      REQUEST_RESPONSE,  () -> null,                            () -> ResponseVerbHandler.instance                             ),
+    REQUEST_RSP            (4,   P1, rpcTimeout,      REQUEST_RESPONSE,  () -> null,                                 () -> ResponseVerbHandler.instance                             ),
     @Deprecated
-    INTERNAL_RSP         (23, P1, rpcTimeout,      INTERNAL_RESPONSE, () -> null,                            () -> ResponseVerbHandler.instance                             ),
+    INTERNAL_RSP           (23,  P1, rpcTimeout,      INTERNAL_RESPONSE, () -> null,                                 () -> ResponseVerbHandler.instance                             ),
 
-    // largest used ID: 99
+    // largest used ID: 116
     ;
 
     public static final List<Verb> VERBS = ImmutableList.copyOf(Verb.values());
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 68a5824..cf6d84b 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.ASYMMETRIC_SYNC_REQ;
 
 /**
  * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream)
@@ -53,7 +53,7 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem
         AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind);
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
         Tracing.traceRepair(message);
-        MessagingService.instance().send(Message.out(REPAIR_REQ, request), request.fetchingNode);
+        MessagingService.instance().send(Message.out(ASYMMETRIC_SYNC_REQ, request), request.fetchingNode);
     }
 
     public void syncComplete(boolean success, List<SessionSummary> summaries)
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2a87fa2..27ffd05 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_RSP;
 
 /**
  * Handles all repair related message.
@@ -62,9 +62,9 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
         RepairJobDesc desc = message.payload.desc;
         try
         {
-            switch (message.payload.messageType)
+            switch (message.verb())
             {
-                case PREPARE_MESSAGE:
+                case PREPARE_MSG:
                     PrepareMessage prepareMessage = (PrepareMessage) message.payload;
                     logger.debug("Preparing, {}", prepareMessage);
                     List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.tableIds.size());
@@ -90,7 +90,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     MessagingService.instance().send(message.emptyResponse(), message.from());
                     break;
 
-                case SNAPSHOT:
+                case SNAPSHOT_MSG:
                     logger.debug("Snapshotting {}", desc);
                     final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
                     if (cfs == null)
@@ -114,7 +114,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     MessagingService.instance().send(message.emptyResponse(), message.from());
                     break;
 
-                case VALIDATION_REQUEST:
+                case VALIDATION_REQ:
                     ValidationRequest validationRequest = (ValidationRequest) message.payload;
                     logger.debug("Validating {}", validationRequest);
                     // trigger read-only compaction
@@ -122,7 +122,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     if (store == null)
                     {
                         logger.error("Table {}.{} was dropped during snapshot phase of repair", desc.keyspace, desc.columnFamily);
-                        MessagingService.instance().send(Message.out(REPAIR_REQ, new ValidationComplete(desc)), message.from());
+                        MessagingService.instance().send(Message.out(VALIDATION_RSP, new ValidationResponse(desc)), message.from());
                         return;
                     }
 
@@ -132,7 +132,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     ValidationManager.instance.submitValidation(store, validator);
                     break;
 
-                case SYNC_REQUEST:
+                case SYNC_REQ:
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
@@ -147,7 +147,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     task.run();
                     break;
 
-                case ASYMMETRIC_SYNC_REQUEST:
+                case ASYMMETRIC_SYNC_REQ:
                     // forwarded sync request
                     AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload;
                     logger.debug("Syncing {}", asymmetricSyncRequest);
@@ -162,49 +162,49 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     asymmetricTask.run();
                     break;
 
-                case CLEANUP:
+                case CLEANUP_MSG:
                     logger.debug("cleaning up repair");
                     CleanupMessage cleanup = (CleanupMessage) message.payload;
                     ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession);
                     MessagingService.instance().send(message.emptyResponse(), message.from());
                     break;
 
-                case CONSISTENT_REQUEST:
+                case PREPARE_CONSISTENT_REQ:
                     ActiveRepairService.instance.consistent.local.handlePrepareMessage(message.from(), (PrepareConsistentRequest) message.payload);
                     break;
 
-                case CONSISTENT_RESPONSE:
+                case PREPARE_CONSISTENT_RSP:
                     ActiveRepairService.instance.consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse) message.payload);
                     break;
 
-                case FINALIZE_PROPOSE:
+                case FINALIZE_PROPOSE_MSG:
                     ActiveRepairService.instance.consistent.local.handleFinalizeProposeMessage(message.from(), (FinalizePropose) message.payload);
                     break;
 
-                case FINALIZE_PROMISE:
+                case FINALIZE_PROMISE_MSG:
                     ActiveRepairService.instance.consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise) message.payload);
                     break;
 
-                case FINALIZE_COMMIT:
+                case FINALIZE_COMMIT_MSG:
                     ActiveRepairService.instance.consistent.local.handleFinalizeCommitMessage(message.from(), (FinalizeCommit) message.payload);
                     break;
 
-                case FAILED_SESSION:
+                case FAILED_SESSION_MSG:
                     FailSession failure = (FailSession) message.payload;
                     ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
                     ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(), failure);
                     break;
 
-                case STATUS_REQUEST:
+                case STATUS_REQ:
                     ActiveRepairService.instance.consistent.local.handleStatusRequest(message.from(), (StatusRequest) message.payload);
                     break;
 
-                case STATUS_RESPONSE:
+                case STATUS_RSP:
                     ActiveRepairService.instance.consistent.local.handleStatusResponse(message.from(), (StatusResponse) message.payload);
                     break;
 
                 default:
-                    ActiveRepairService.instance.handleMessage(message.from(), message.payload);
+                    ActiveRepairService.instance.handleMessage(message);
                     break;
             }
         }
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index fab4b28..40e4b3d 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SnapshotMessage;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SNAPSHOT_MSG;
 
 /**
  * SnapshotTask is a task that sends snapshot request.
@@ -46,7 +46,7 @@ public class SnapshotTask extends AbstractFuture<InetAddressAndPort> implements
 
     public void run()
     {
-        MessagingService.instance().sendWithCallback(Message.out(REPAIR_REQ, new SnapshotMessage(desc)),
+        MessagingService.instance().sendWithCallback(Message.out(SNAPSHOT_MSG, new SnapshotMessage(desc)),
                                                      endpoint,
                                                      new SnapshotCallback(this));
     }
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 53407c9..827dce3 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.repair.messages.SyncResponse;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
@@ -39,11 +39,11 @@ import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamOperation;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_RSP;
 
 /**
  * StreamingRepairTask performs data streaming between two remote replicas, neither of which is repair coordinator.
- * Task will send {@link SyncComplete} message back to coordinator upon streaming completion.
+ * Task will send {@link SyncResponse} message back to coordinator upon streaming completion.
  */
 public class StreamingRepairTask implements Runnable, StreamEventHandler
 {
@@ -103,7 +103,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
     public void onSuccess(StreamState state)
     {
         logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, initiator);
-        MessagingService.instance().send(Message.out(REPAIR_REQ, new SyncComplete(desc, src, dst, true, state.createSummaries())), initiator);
+        MessagingService.instance().send(Message.out(SYNC_RSP, new SyncResponse(desc, src, dst, true, state.createSummaries())), initiator);
     }
 
     /**
@@ -111,6 +111,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
      */
     public void onFailure(Throwable t)
     {
-        MessagingService.instance().send(Message.out(REPAIR_REQ, new SyncComplete(desc, src, dst, false, Collections.emptyList())), initiator);
+        MessagingService.instance().send(Message.out(SYNC_RSP, new SyncResponse(desc, src, dst, false, Collections.emptyList())), initiator);
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index b608d67..181554a 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.SYNC_REQ;
 
 /**
  * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node
@@ -53,9 +53,9 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo
         super(desc, r1, r2, differences, previewKind);
     }
 
-    void sendRequest(RepairMessage request, InetAddressAndPort to)
+    void sendRequest(SyncRequest request, InetAddressAndPort to)
     {
-        MessagingService.instance().send(Message.out(REPAIR_REQ, request), to);
+        MessagingService.instance().send(Message.out(SYNC_REQ, request), to);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 1892a59..0161acf 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.MerkleTrees;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
 
 /**
  * ValidationTask sends {@link ValidationRequest} to a replica.
@@ -54,7 +54,7 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn
     public void run()
     {
         ValidationRequest request = new ValidationRequest(desc, nowInSec);
-        MessagingService.instance().send(Message.out(REPAIR_REQ, request), endpoint);
+        MessagingService.instance().send(Message.out(VALIDATION_REQ, request), endpoint);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 9a89fa6..0d6fdb4 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.messages.ValidationResponse;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.tracing.Tracing;
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
 import org.apache.cassandra.utils.MerkleTrees;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.VALIDATION_RSP;
 
 /**
  * Handles the building of a merkle tree for a column family.
@@ -391,7 +391,7 @@ public class Validator implements Runnable
     public void fail()
     {
         logger.error("Failed creating a merkle tree for {}, {} (see log for details)", desc, initiator);
-        respond(new ValidationComplete(desc));
+        respond(new ValidationResponse(desc));
     }
 
     /**
@@ -410,7 +410,7 @@ public class Validator implements Runnable
             Tracing.traceRepair("Local completed merkle tree for {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
 
         }
-        respond(new ValidationComplete(desc, trees));
+        respond(new ValidationResponse(desc, trees));
     }
 
     private boolean initiatorIsRemote()
@@ -418,11 +418,11 @@ public class Validator implements Runnable
         return !FBUtilities.getBroadcastAddressAndPort().equals(initiator);
     }
 
-    private void respond(ValidationComplete response)
+    private void respond(ValidationResponse response)
     {
         if (initiatorIsRemote())
         {
-            MessagingService.instance().send(Message.out(REPAIR_REQ, response), initiator);
+            MessagingService.instance().send(Message.out(VALIDATION_RSP, response), initiator);
             return;
         }
 
@@ -434,7 +434,7 @@ public class Validator implements Runnable
          */
         StageManager.getStage(Stage.ANTI_ENTROPY).execute(() ->
         {
-            ValidationComplete movedResponse = response;
+            ValidationResponse movedResponse = response;
             try
             {
                 movedResponse = response.tryMoveOffHeap();
@@ -443,7 +443,7 @@ public class Validator implements Runnable
             {
                 logger.error("Failed to move local merkle tree for {} off heap", desc, e);
             }
-            ActiveRepairService.instance.handleMessage(initiator, movedResponse);
+            ActiveRepairService.instance.handleMessage(Message.out(VALIDATION_RSP, movedResponse));
         });
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index 28f5d08..8f1759a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -139,11 +139,10 @@ public class CoordinatorSession extends ConsistentSession
         return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED);
     }
 
-    protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, Message<RepairMessage> message)
     {
-        logger.trace("Sending {} to {}", message, destination);
-        Message<RepairMessage> messageOut = Message.out(Verb.REPAIR_REQ, message);
-        MessagingService.instance().send(messageOut, destination);
+        logger.trace("Sending {} to {}", message.payload, destination);
+        MessagingService.instance().send(message, destination);
     }
 
     public ListenableFuture<Boolean> prepare()
@@ -151,7 +150,8 @@ public class CoordinatorSession extends ConsistentSession
         Preconditions.checkArgument(allStates(State.PREPARING));
 
         logger.debug("Beginning prepare phase of incremental repair session {}", sessionID);
-        PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants);
+        Message<RepairMessage> message =
+            Message.out(Verb.PREPARE_CONSISTENT_REQ, new PrepareConsistentRequest(sessionID, coordinator, participants));
         for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
@@ -198,7 +198,7 @@ public class CoordinatorSession extends ConsistentSession
     {
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.debug("Proposing finalization of repair session {}", sessionID);
-        FinalizePropose message = new FinalizePropose(sessionID);
+        Message<RepairMessage> message = Message.out(Verb.FINALIZE_PROPOSE_MSG, new FinalizePropose(sessionID));
         for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
@@ -234,7 +234,7 @@ public class CoordinatorSession extends ConsistentSession
     {
         Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
         logger.debug("Committing finalization of repair session {}", sessionID);
-        FinalizeCommit message = new FinalizeCommit(sessionID);
+        Message<RepairMessage> message = Message.out(Verb.FINALIZE_COMMIT_MSG, new FinalizeCommit(sessionID));
         for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
@@ -245,7 +245,7 @@ public class CoordinatorSession extends ConsistentSession
 
     private void sendFailureMessageToParticipants()
     {
-        FailSession message = new FailSession(sessionID);
+        Message<RepairMessage> message = Message.out(Verb.FAILED_SESSION_MSG, new FailSession(sessionID));
         for (final InetAddressAndPort participant : participants)
         {
             if (participantStates.get(participant) != State.FAILED)
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index e93ccb0..935bba8 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -88,7 +88,11 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.FAILED_SESSION_MSG;
+import static org.apache.cassandra.net.Verb.FINALIZE_PROMISE_MSG;
+import static org.apache.cassandra.net.Verb.PREPARE_CONSISTENT_RSP;
+import static org.apache.cassandra.net.Verb.STATUS_REQ;
+import static org.apache.cassandra.net.Verb.STATUS_RSP;
 import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
 
 /**
@@ -190,10 +194,11 @@ public class LocalSessions
                                     sessionID, session.coordinator);
 
         setStateAndSave(session, FAILED);
+        Message<FailSession> message = Message.out(FAILED_SESSION_MSG, new FailSession(sessionID));
         for (InetAddressAndPort participant : session.participants)
         {
             if (!participant.equals(getBroadcastAddressAndPort()))
-                sendMessage(participant, new FailSession(sessionID));
+                sendMessage(participant, message);
         }
     }
 
@@ -491,11 +496,10 @@ public class LocalSessions
         return ActiveRepairService.instance.getParentRepairSession(sessionID);
     }
 
-    protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message)
     {
-        logger.trace("sending {} to {}", message, destination);
-        Message<RepairMessage> messageOut = Message.out(REPAIR_REQ, message);
-        MessagingService.instance().send(messageOut, destination);
+        logger.trace("sending {} to {}", message.payload, destination);
+        MessagingService.instance().send(message, destination);
     }
 
     private void setStateAndSave(LocalSession session, ConsistentSession.State state)
@@ -538,7 +542,7 @@ public class LocalSessions
             }
             if (sendMessage)
             {
-                sendMessage(session.coordinator, new FailSession(sessionID));
+                sendMessage(session.coordinator, Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
             }
         }
     }
@@ -609,7 +613,7 @@ public class LocalSessions
         catch (Throwable e)
         {
             logger.error("Error retrieving ParentRepairSession for session {}, responding with failure", sessionID);
-            sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+            sendMessage(coordinator, Message.out(PREPARE_CONSISTENT_RSP, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
             return;
         }
 
@@ -632,15 +636,14 @@ public class LocalSessions
                 {
                     logger.info("Prepare phase for incremental repair session {} completed", sessionID);
                     if (session.getState() != FAILED)
-                    {
                         setStateAndSave(session, PREPARED);
-                        sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
-                    }
                     else
-                    {
                         logger.info("Session {} failed before anticompaction completed", sessionID);
-                        sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
-                    }
+
+                    Message<PrepareConsistentResponse> message =
+                        Message.out(PREPARE_CONSISTENT_RSP,
+                                    new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), session.getState() != FAILED));
+                    sendMessage(coordinator, message);
                 }
                 finally
                 {
@@ -653,7 +656,9 @@ public class LocalSessions
                 try
                 {
                     logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
-                    sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+                    sendMessage(coordinator,
+                                Message.out(PREPARE_CONSISTENT_RSP,
+                                            new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)));
                     failSession(sessionID, false);
                 }
                 finally
@@ -682,7 +687,7 @@ public class LocalSessions
         if (session == null)
         {
             logger.debug("Received FinalizePropose message for unknown repair session {}, responding with failure", sessionID);
-            sendMessage(from, new FailSession(sessionID));
+            sendMessage(from, Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
             return;
         }
 
@@ -699,7 +704,7 @@ public class LocalSessions
              */
             syncTable();
 
-            sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true));
+            sendMessage(from, Message.out(FINALIZE_PROMISE_MSG, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)));
             logger.debug("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID);
         }
         catch (IllegalArgumentException e)
@@ -753,7 +758,8 @@ public class LocalSessions
     public void sendStatusRequest(LocalSession session)
     {
         logger.debug("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID);
-        StatusRequest request = new StatusRequest(session.sessionID);
+        Message<StatusRequest> request = Message.out(STATUS_REQ, new StatusRequest(session.sessionID));
+
         for (InetAddressAndPort participant : session.participants)
         {
             if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant))
@@ -771,11 +777,11 @@ public class LocalSessions
         if (session == null)
         {
             logger.warn("Received status response message for unknown session {}", sessionID);
-            sendMessage(from, new StatusResponse(sessionID, FAILED));
+            sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, FAILED)));
         }
         else
         {
-            sendMessage(from, new StatusResponse(sessionID, session.getState()));
+            sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, session.getState())));
             logger.debug("Responding to status response message for incremental repair session {} with local state {}", sessionID, session.getState());
        }
     }
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
index 0a6d257..eacc285 100644
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -38,8 +39,6 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
 
 public class AsymmetricSyncRequest extends RepairMessage
 {
-    public static MessageSerializer serializer = new SyncRequestSerializer();
-
     public final InetAddressAndPort initiator;
     public final InetAddressAndPort fetchingNode;
     public final InetAddressAndPort fetchFrom;
@@ -48,7 +47,7 @@ public class AsymmetricSyncRequest extends RepairMessage
 
     public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
     {
-        super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
+        super(desc);
         this.initiator = initiator;
         this.fetchingNode = fetchingNode;
         this.fetchFrom = fetchFrom;
@@ -62,8 +61,7 @@ public class AsymmetricSyncRequest extends RepairMessage
         if (!(o instanceof AsymmetricSyncRequest))
             return false;
         AsymmetricSyncRequest req = (AsymmetricSyncRequest)o;
-        return messageType == req.messageType &&
-               desc.equals(req.desc) &&
+        return desc.equals(req.desc) &&
                initiator.equals(req.initiator) &&
                fetchingNode.equals(req.fetchingNode) &&
                fetchFrom.equals(req.fetchFrom) &&
@@ -73,10 +71,10 @@ public class AsymmetricSyncRequest extends RepairMessage
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc, initiator, fetchingNode, fetchFrom, ranges);
+        return Objects.hash(desc, initiator, fetchingNode, fetchFrom, ranges);
     }
 
-    public static class SyncRequestSerializer implements MessageSerializer<AsymmetricSyncRequest>
+    public static final IVersionedSerializer<AsymmetricSyncRequest> serializer = new IVersionedSerializer<AsymmetricSyncRequest>()
     {
         public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
         {
@@ -119,7 +117,7 @@ public class AsymmetricSyncRequest extends RepairMessage
             size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
-    }
+    };
 
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
index 69d147a..5ec7fc6 100644
--- a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Objects;
 import java.util.UUID;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -32,12 +33,11 @@ import org.apache.cassandra.utils.UUIDSerializer;
  */
 public class CleanupMessage extends RepairMessage
 {
-    public static MessageSerializer serializer = new CleanupMessageSerializer();
     public final UUID parentRepairSession;
 
     public CleanupMessage(UUID parentRepairSession)
     {
-        super(Type.CLEANUP, null);
+        super(null);
         this.parentRepairSession = parentRepairSession;
     }
 
@@ -47,17 +47,16 @@ public class CleanupMessage extends RepairMessage
         if (!(o instanceof CleanupMessage))
             return false;
         CleanupMessage other = (CleanupMessage) o;
-        return messageType == other.messageType &&
-               parentRepairSession.equals(other.parentRepairSession);
+        return parentRepairSession.equals(other.parentRepairSession);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, parentRepairSession);
+        return Objects.hash(parentRepairSession);
     }
 
-    public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
+    public static final IVersionedSerializer<CleanupMessage> serializer = new IVersionedSerializer<CleanupMessage>()
     {
         public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException
         {
@@ -74,5 +73,5 @@ public class CleanupMessage extends RepairMessage
         {
             return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
         }
-    }
+    };
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/FailSession.java b/src/java/org/apache/cassandra/repair/messages/FailSession.java
index 1227cc3..b8c7ad3 100644
--- a/src/java/org/apache/cassandra/repair/messages/FailSession.java
+++ b/src/java/org/apache/cassandra/repair/messages/FailSession.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FailSession extends RepairMessage
 
     public FailSession(UUID sessionID)
     {
-        super(Type.FAILED_SESSION, null);
+        super(null);
         assert sessionID != null;
         this.sessionID = sessionID;
     }
@@ -51,7 +52,7 @@ public class FailSession extends RepairMessage
         return sessionID.hashCode();
     }
 
-    public static final MessageSerializer serializer = new MessageSerializer<FailSession>()
+    public static final IVersionedSerializer<FailSession> serializer = new IVersionedSerializer<FailSession>()
     {
         public void serialize(FailSession msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
index a4eb111..bb5cca7 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizeCommit.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FinalizeCommit extends RepairMessage
 
     public FinalizeCommit(UUID sessionID)
     {
-        super(Type.FINALIZE_COMMIT, null);
+        super(null);
         assert sessionID != null;
         this.sessionID = sessionID;
     }
@@ -58,7 +59,7 @@ public class FinalizeCommit extends RepairMessage
                '}';
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<FinalizeCommit>()
+    public static final IVersionedSerializer<FinalizeCommit> serializer = new IVersionedSerializer<FinalizeCommit>()
     {
         public void serialize(FinalizeCommit msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index 07e7e0d..cfdc07c 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,7 +38,7 @@ public class FinalizePromise extends RepairMessage
 
     public FinalizePromise(UUID sessionID, InetAddressAndPort participant, boolean promised)
     {
-        super(Type.FINALIZE_PROMISE, null);
+        super(null);
         assert sessionID != null;
         assert participant != null;
         this.sessionID = sessionID;
@@ -65,7 +66,7 @@ public class FinalizePromise extends RepairMessage
         return result;
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>()
+    public static final IVersionedSerializer<FinalizePromise> serializer = new IVersionedSerializer<FinalizePromise>()
     {
         public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
index c0c49df..c21dd78 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePropose.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class FinalizePropose extends RepairMessage
 
     public FinalizePropose(UUID sessionID)
     {
-        super(Type.FINALIZE_PROPOSE, null);
+        super(null);
         assert sessionID != null;
         this.sessionID = sessionID;
     }
@@ -58,7 +59,7 @@ public class FinalizePropose extends RepairMessage
                '}';
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<FinalizePropose>()
+    public static final IVersionedSerializer<FinalizePropose> serializer = new IVersionedSerializer<FinalizePropose>()
     {
         public void serialize(FinalizePropose msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index b1e9b04..c1be082 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -41,7 +42,7 @@ public class PrepareConsistentRequest extends RepairMessage
 
     public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> participants)
     {
-        super(Type.CONSISTENT_REQUEST, null);
+        super(null);
         assert parentSession != null;
         assert coordinator != null;
         assert participants != null && !participants.isEmpty();
@@ -79,9 +80,8 @@ public class PrepareConsistentRequest extends RepairMessage
                '}';
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>()
+    public static final IVersionedSerializer<PrepareConsistentRequest> serializer = new IVersionedSerializer<PrepareConsistentRequest>()
     {
-
         public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(request.parentSession, out, version);
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index 3362a40..00de77d 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,7 +38,7 @@ public class PrepareConsistentResponse extends RepairMessage
 
     public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort participant, boolean success)
     {
-        super(Type.CONSISTENT_RESPONSE, null);
+        super(null);
         assert parentSession != null;
         assert participant != null;
         this.parentSession = parentSession;
@@ -65,7 +66,7 @@ public class PrepareConsistentResponse extends RepairMessage
         return result;
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>()
+    public static final IVersionedSerializer<PrepareConsistentResponse> serializer = new IVersionedSerializer<PrepareConsistentResponse>()
     {
         public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 5a0701c..9c485bc 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -24,10 +24,13 @@ import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
@@ -38,7 +41,6 @@ import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareMessage extends RepairMessage
 {
-    public final static MessageSerializer serializer = new PrepareMessageSerializer();
     public final List<TableId> tableIds;
     public final Collection<Range<Token>> ranges;
 
@@ -50,7 +52,7 @@ public class PrepareMessage extends RepairMessage
 
     public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal, PreviewKind previewKind)
     {
-        super(Type.PREPARE_MESSAGE, null);
+        super(null);
         this.parentRepairSession = parentRepairSession;
         this.tableIds = tableIds;
         this.ranges = ranges;
@@ -66,8 +68,7 @@ public class PrepareMessage extends RepairMessage
         if (!(o instanceof PrepareMessage))
             return false;
         PrepareMessage other = (PrepareMessage) o;
-        return messageType == other.messageType &&
-               parentRepairSession.equals(other.parentRepairSession) &&
+        return parentRepairSession.equals(other.parentRepairSession) &&
                isIncremental == other.isIncremental &&
                isGlobal == other.isGlobal &&
                previewKind == other.previewKind &&
@@ -79,13 +80,18 @@ public class PrepareMessage extends RepairMessage
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, parentRepairSession, isGlobal, previewKind, isIncremental, timestamp, tableIds, ranges);
+        return Objects.hash(parentRepairSession, isGlobal, previewKind, isIncremental, timestamp, tableIds, ranges);
     }
 
-    public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
+    private static final String MIXED_MODE_ERROR = "Some nodes involved in repair are on an incompatible major version. " +
+                                                   "Repair is not supported in mixed major version clusters.";
+
+    public static final IVersionedSerializer<PrepareMessage> serializer = new IVersionedSerializer<PrepareMessage>()
     {
         public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException
         {
+            Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
             out.writeInt(message.tableIds.size());
             for (TableId tableId : message.tableIds)
                 tableId.serialize(out);
@@ -104,6 +110,8 @@ public class PrepareMessage extends RepairMessage
 
         public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException
         {
+            Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
             int tableIdCount = in.readInt();
             List<TableId> tableIds = new ArrayList<>(tableIdCount);
             for (int i = 0; i < tableIdCount; i++)
@@ -122,6 +130,8 @@ public class PrepareMessage extends RepairMessage
 
         public long serializedSize(PrepareMessage message, int version)
         {
+            Preconditions.checkArgument(version == MessagingService.current_version, MIXED_MODE_ERROR);
+
             long size;
             size = TypeSizes.sizeof(message.tableIds.size());
             for (TableId tableId : message.tableIds)
@@ -136,7 +146,7 @@ public class PrepareMessage extends RepairMessage
             size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
-    }
+    };
 
     @Override
     public String toString()
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index db1a134..3137b4e 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -17,14 +17,6 @@
  */
 package org.apache.cassandra.repair.messages;
 
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
 
 /**
@@ -34,85 +26,10 @@ import org.apache.cassandra.repair.RepairJobDesc;
  */
 public abstract class RepairMessage
 {
-    public static final IVersionedSerializer<RepairMessage> serializer = new RepairMessageSerializer();
-
-    public static interface MessageSerializer<T extends RepairMessage> extends IVersionedSerializer<T> {}
-
-    public static final int MIN_MESSAGING_VERSION = MessagingService.VERSION_40;
-    private static final String MIXED_MODE_ERROR = "Some nodes involved in repair are on an incompatible major version. " +
-                                                   "Repair is not supported in mixed major version clusters.";
-
-    public enum Type
-    {
-        VALIDATION_REQUEST(0, ValidationRequest.serializer),
-        VALIDATION_COMPLETE(1, ValidationComplete.serializer),
-        SYNC_REQUEST(2, SyncRequest.serializer),
-        SYNC_COMPLETE(3, SyncComplete.serializer),
-        PREPARE_MESSAGE(5, PrepareMessage.serializer),
-        SNAPSHOT(6, SnapshotMessage.serializer),
-        CLEANUP(7, CleanupMessage.serializer),
-
-        CONSISTENT_REQUEST(8, PrepareConsistentRequest.serializer),
-        CONSISTENT_RESPONSE(9, PrepareConsistentResponse.serializer),
-        FINALIZE_PROPOSE(10, FinalizePropose.serializer),
-        FINALIZE_PROMISE(11, FinalizePromise.serializer),
-        FINALIZE_COMMIT(12, FinalizeCommit.serializer),
-        FAILED_SESSION(13, FailSession.serializer),
-        STATUS_REQUEST(14, StatusRequest.serializer),
-        STATUS_RESPONSE(15, StatusResponse.serializer),
-        ASYMMETRIC_SYNC_REQUEST(16, AsymmetricSyncRequest.serializer);
-
-        private final byte type;
-        private final MessageSerializer<RepairMessage> serializer;
-
-        Type(int type, MessageSerializer<RepairMessage> serializer)
-        {
-            this.type = (byte) type;
-            this.serializer = serializer;
-        }
-
-        public static Type fromByte(byte b)
-        {
-            for (Type t : values())
-            {
-               if (t.type == b)
-                   return t;
-            }
-            throw new IllegalArgumentException("Unknown RepairMessage.Type: " + b);
-        }
-    }
-
-    public final Type messageType;
     public final RepairJobDesc desc;
 
-    protected RepairMessage(Type messageType, RepairJobDesc desc)
+    protected RepairMessage(RepairJobDesc desc)
     {
-        this.messageType = messageType;
         this.desc = desc;
     }
-
-    public static class RepairMessageSerializer implements MessageSerializer<RepairMessage>
-    {
-        public void serialize(RepairMessage message, DataOutputPlus out, int version) throws IOException
-        {
-            Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
-            out.write(message.messageType.type);
-            message.messageType.serializer.serialize(message, out, version);
-        }
-
-        public RepairMessage deserialize(DataInputPlus in, int version) throws IOException
-        {
-            Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
-            RepairMessage.Type messageType = RepairMessage.Type.fromByte(in.readByte());
-            return messageType.serializer.deserialize(in, version);
-        }
-
-        public long serializedSize(RepairMessage message, int version)
-        {
-            Preconditions.checkArgument(version >= MIN_MESSAGING_VERSION, MIXED_MODE_ERROR);
-            long size = 1; // for messageType byte
-            size += message.messageType.serializer.serializedSize(message, version);
-            return size;
-        }
-    }
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
index d4737d3..c18950a 100644
--- a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -20,17 +20,16 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 import java.util.Objects;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
 
 public class SnapshotMessage extends RepairMessage
 {
-    public final static MessageSerializer serializer = new SnapshotMessageSerializer();
-
     public SnapshotMessage(RepairJobDesc desc)
     {
-        super(Type.SNAPSHOT, desc);
+        super(desc);
     }
 
     @Override
@@ -39,16 +38,16 @@ public class SnapshotMessage extends RepairMessage
         if (!(o instanceof SnapshotMessage))
             return false;
         SnapshotMessage other = (SnapshotMessage) o;
-        return messageType == other.messageType;
+        return desc.equals(other.desc);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType);
+        return Objects.hash(desc);
     }
 
-    public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage>
+    public static final IVersionedSerializer<SnapshotMessage> serializer = new IVersionedSerializer<SnapshotMessage>()
     {
         public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws IOException
         {
@@ -65,5 +64,5 @@ public class SnapshotMessage extends RepairMessage
         {
             return RepairJobDesc.serializer.serializedSize(message.desc, version);
         }
-    }
+    };
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
index f6a2b82..09354e6 100644
--- a/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/StatusRequest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -31,7 +32,7 @@ public class StatusRequest extends RepairMessage
 
     public StatusRequest(UUID sessionID)
     {
-        super(Type.STATUS_REQUEST, null);
+        super(null);
         this.sessionID = sessionID;
     }
 
@@ -57,7 +58,7 @@ public class StatusRequest extends RepairMessage
                '}';
     }
 
-    public static MessageSerializer serializer = new MessageSerializer<StatusRequest>()
+    public static final IVersionedSerializer<StatusRequest> serializer = new IVersionedSerializer<StatusRequest>()
     {
         public void serialize(StatusRequest msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
index 99eb76b..e62d337 100644
--- a/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/StatusResponse.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.consistent.ConsistentSession;
@@ -34,7 +35,7 @@ public class StatusResponse extends RepairMessage
 
     public StatusResponse(UUID sessionID, ConsistentSession.State state)
     {
-        super(Type.STATUS_RESPONSE, null);
+        super(null);
         assert sessionID != null;
         assert state != null;
         this.sessionID = sessionID;
@@ -67,7 +68,7 @@ public class StatusResponse extends RepairMessage
                '}';
     }
 
-    public static final MessageSerializer serializer = new MessageSerializer<StatusResponse>()
+    public static final IVersionedSerializer<StatusResponse> serializer = new IVersionedSerializer<StatusResponse>()
     {
         public void serialize(StatusResponse msg, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 71fcdb0..341455f 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -44,8 +45,6 @@ import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAdd
  */
 public class SyncRequest extends RepairMessage
 {
-    public static MessageSerializer serializer = new SyncRequestSerializer();
-
     public final InetAddressAndPort initiator;
     public final InetAddressAndPort src;
     public final InetAddressAndPort dst;
@@ -54,7 +53,7 @@ public class SyncRequest extends RepairMessage
 
    public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
    {
-        super(Type.SYNC_REQUEST, desc);
+        super(desc);
         this.initiator = initiator;
         this.src = src;
         this.dst = dst;
@@ -68,8 +67,7 @@ public class SyncRequest extends RepairMessage
         if (!(o instanceof SyncRequest))
             return false;
         SyncRequest req = (SyncRequest)o;
-        return messageType == req.messageType &&
-               desc.equals(req.desc) &&
+        return desc.equals(req.desc) &&
                initiator.equals(req.initiator) &&
                src.equals(req.src) &&
                dst.equals(req.dst) &&
@@ -80,10 +78,10 @@ public class SyncRequest extends RepairMessage
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc, initiator, src, dst, ranges, previewKind);
+        return Objects.hash(desc, initiator, src, dst, ranges, previewKind);
     }
 
-    public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
+    public static final IVersionedSerializer<SyncRequest> serializer = new IVersionedSerializer<SyncRequest>()
     {
         public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException
         {
@@ -124,7 +122,7 @@ public class SyncRequest extends RepairMessage
             size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
-    }
+    };
 
     @Override
     public String toString()
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
similarity index 79%
rename from src/java/org/apache/cassandra/repair/messages/SyncComplete.java
rename to src/java/org/apache/cassandra/repair/messages/SyncResponse.java
index c51d1fd..e7e7985 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -34,10 +35,8 @@ import org.apache.cassandra.streaming.SessionSummary;
  *
  * @since 2.0
  */
-public class SyncComplete extends RepairMessage
+public class SyncResponse extends RepairMessage
 {
-    public static final MessageSerializer serializer = new SyncCompleteSerializer();
-
     /** nodes that involved in this sync */
     public final SyncNodePair nodes;
     /** true if sync success, false otherwise */
@@ -45,17 +44,17 @@ public class SyncComplete extends RepairMessage
 
     public final List<SessionSummary> summaries;
 
-    public SyncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
+    public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries)
     {
-        super(Type.SYNC_COMPLETE, desc);
+        super(desc);
         this.nodes = nodes;
         this.success = success;
         this.summaries = summaries;
     }
 
-    public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
+    public SyncResponse(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
     {
-        super(Type.SYNC_COMPLETE, desc);
+        super(desc);
         this.summaries = summaries;
         this.nodes = new SyncNodePair(endpoint1, endpoint2);
         this.success = success;
@@ -64,11 +63,10 @@ public class SyncComplete extends RepairMessage
     @Override
     public boolean equals(Object o)
     {
-        if (!(o instanceof SyncComplete))
+        if (!(o instanceof SyncResponse))
             return false;
-        SyncComplete other = (SyncComplete)o;
-        return messageType == other.messageType &&
-               desc.equals(other.desc) &&
+        SyncResponse other = (SyncResponse)o;
+        return desc.equals(other.desc) &&
                success == other.success &&
                nodes.equals(other.nodes) &&
                summaries.equals(other.summaries);
@@ -77,12 +75,12 @@ public class SyncComplete extends RepairMessage
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc, success, nodes, summaries);
+        return Objects.hash(desc, success, nodes, summaries);
     }
 
-    private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
+    public static final IVersionedSerializer<SyncResponse> serializer = new IVersionedSerializer<SyncResponse>()
     {
-        public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException
+        public void serialize(SyncResponse message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
             SyncNodePair.serializer.serialize(message.nodes, out, version);
@@ -95,7 +93,7 @@ public class SyncComplete extends RepairMessage
             }
         }
 
-        public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
+        public SyncResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version);
@@ -108,10 +106,10 @@ public class SyncComplete extends RepairMessage
                 summaries.add(SessionSummary.serializer.deserialize(in, version));
             }
 
-            return new SyncComplete(desc, nodes, success, summaries);
+            return new SyncResponse(desc, nodes, success, summaries);
         }
 
-        public long serializedSize(SyncComplete message, int version)
+        public long serializedSize(SyncResponse message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
             size += SyncNodePair.serializer.serializedSize(message.nodes, version);
@@ -125,5 +123,5 @@ public class SyncComplete extends RepairMessage
 
             return size;
         }
-    }
+    };
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
index 6466244..f9a1f4e 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationRequest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.messages;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -31,13 +32,11 @@ import org.apache.cassandra.repair.RepairJobDesc;
  */
 public class ValidationRequest extends RepairMessage
 {
-    public static MessageSerializer serializer = new ValidationRequestSerializer();
-
     public final int nowInSec;
 
     public ValidationRequest(RepairJobDesc desc, int nowInSec)
     {
-        super(Type.VALIDATION_REQUEST, desc);
+        super(desc);
         this.nowInSec = nowInSec;
     }
 
@@ -65,7 +64,7 @@ public class ValidationRequest extends RepairMessage
         return nowInSec;
     }
 
-    public static class ValidationRequestSerializer implements MessageSerializer<ValidationRequest>
+    public static final IVersionedSerializer<ValidationRequest> serializer = new IVersionedSerializer<ValidationRequest>()
     {
         public void serialize(ValidationRequest message, DataOutputPlus out, int version) throws IOException
         {
@@ -85,5 +84,5 @@ public class ValidationRequest extends RepairMessage
             size += TypeSizes.sizeof(message.nowInSec);
             return size;
         }
-    }
+    };
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
similarity index 70%
rename from src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
rename to src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
index b8aa736..d9f4467 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationResponse.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -31,22 +32,20 @@ import org.apache.cassandra.utils.MerkleTrees;
  *
  * @since 2.0
  */
-public class ValidationComplete extends RepairMessage
+public class ValidationResponse extends RepairMessage
 {
-    public static MessageSerializer serializer = new ValidationCompleteSerializer();
-
     /** Merkle hash tree response. Null if validation failed. */
     public final MerkleTrees trees;
 
-    public ValidationComplete(RepairJobDesc desc)
+    public ValidationResponse(RepairJobDesc desc)
     {
-        super(Type.VALIDATION_COMPLETE, desc);
+        super(desc);
         trees = null;
     }
 
-    public ValidationComplete(RepairJobDesc desc, MerkleTrees trees)
+    public ValidationResponse(RepairJobDesc desc, MerkleTrees trees)
     {
-        super(Type.VALIDATION_COMPLETE, desc);
+        super(desc);
         assert trees != null;
         this.trees = trees;
     }
@@ -57,34 +56,33 @@ public class ValidationComplete extends RepairMessage
     }
 
     /**
-     * @return a new {@link ValidationComplete} instance with all trees moved off heap, or {@code this}
+     * @return a new {@link ValidationResponse} instance with all trees moved off heap, or {@code this}
      * if it's a failure response.
      */
-    public ValidationComplete tryMoveOffHeap() throws IOException
+    public ValidationResponse tryMoveOffHeap() throws IOException
     {
-        return trees == null ? this : new ValidationComplete(desc, trees.tryMoveOffHeap());
+        return trees == null ? this : new ValidationResponse(desc, trees.tryMoveOffHeap());
     }
 
     @Override
     public boolean equals(Object o)
     {
-        if (!(o instanceof ValidationComplete))
+        if (!(o instanceof ValidationResponse))
             return false;
 
-        ValidationComplete other = (ValidationComplete)o;
-        return messageType == other.messageType &&
-               desc.equals(other.desc);
+        ValidationResponse other = (ValidationResponse)o;
+        return desc.equals(other.desc);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc);
+        return Objects.hash(desc);
     }
 
-    private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
+    public static final IVersionedSerializer<ValidationResponse> serializer = new IVersionedSerializer<ValidationResponse>()
     {
-        public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException
+        public void serialize(ValidationResponse message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
             out.writeBoolean(message.success());
@@ -92,7 +90,7 @@ public class ValidationComplete extends RepairMessage
                 MerkleTrees.serializer.serialize(message.trees, out, version);
         }
 
-        public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException
+        public ValidationResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             boolean success = in.readBoolean();
@@ -100,13 +98,13 @@ public class ValidationComplete extends RepairMessage
             if (success)
             {
                 MerkleTrees trees = MerkleTrees.serializer.deserialize(in, version);
-                return new ValidationComplete(desc, trees);
+                return new ValidationResponse(desc, trees);
             }
 
-            return new ValidationComplete(desc);
+            return new ValidationResponse(desc);
         }
 
-        public long serializedSize(ValidationComplete message, int version)
+        public long serializedSize(ValidationResponse message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
             size += TypeSizes.sizeof(message.success());
@@ -114,5 +112,5 @@ public class ValidationComplete extends RepairMessage
                 size += MerkleTrees.serializer.serializedSize(message.trees, version);
             return size;
         }
-    }
+    };
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 409d799..6f4c474 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -78,7 +78,7 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.net.Verb.REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.PREPARE_MSG;
 
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
@@ -112,8 +112,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
     private boolean registeredForEndpointChanges = false;
 
-    public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
-
     private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
     // singleton enforcement
     public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
@@ -437,7 +435,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             if (FailureDetector.instance.isAlive(neighbour))
             {
                 PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
-                Message<RepairMessage> msg = Message.out(REPAIR_REQ, message);
+                Message<RepairMessage> msg = Message.out(PREPARE_MSG, message);
                 MessagingService.instance().sendWithCallback(msg, neighbour, callback);
             }
             else
@@ -527,21 +525,21 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return parentRepairSessions.remove(parentSessionId);
     }
 
-    public void handleMessage(InetAddressAndPort endpoint, RepairMessage message)
+    public void handleMessage(Message<? extends RepairMessage> message)
     {
-        RepairJobDesc desc = message.desc;
+        RepairJobDesc desc = message.payload.desc;
         RepairSession session = sessions.get(desc.sessionId);
         if (session == null)
             return;
-        switch (message.messageType)
+        switch (message.verb())
         {
-            case VALIDATION_COMPLETE:
-                ValidationComplete validation = (ValidationComplete) message;
-                session.validationComplete(desc, endpoint, validation.trees);
+            case VALIDATION_RSP:
+                ValidationResponse validation = (ValidationResponse) message.payload;
+                session.validationComplete(desc, message.from(), validation.trees);
                 break;
-            case SYNC_COMPLETE:
+            case SYNC_RSP:
                 // one of replica is synced.
-                SyncComplete sync = (SyncComplete) message;
+                SyncResponse sync = (SyncResponse) message.payload;
                 session.syncComplete(desc, sync.nodes, sync.success, sync.summaries);
                 break;
             default:
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
index 849faf2..4e8caa6 100644
Binary files a/test/data/serialization/4.0/service.SyncComplete.bin and b/test/data/serialization/4.0/service.SyncComplete.bin differ
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index f5f2e3d..b0cc44e 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
index 3f5b318..7402c9e 100644
Binary files a/test/data/serialization/4.0/service.ValidationComplete.bin and b/test/data/serialization/4.0/service.ValidationComplete.bin differ
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
index 72d4a1f..fa4a913 100644
Binary files a/test/data/serialization/4.0/service.ValidationRequest.bin and b/test/data/serialization/4.0/service.ValidationRequest.bin differ
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index b84adaa..068544d 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -181,14 +182,14 @@ public class RepairJobTest
         assertEquals(0, result.stats.size());
 
         // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
-        List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+        List<Verb> expectedTypes = new ArrayList<>();
         for (int i = 0; i < 3; i++)
-            expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+            expectedTypes.add(Verb.SNAPSHOT_MSG);
         for (int i = 0; i < 3; i++)
-            expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+            expectedTypes.add(Verb.VALIDATION_REQ);
 
         assertEquals(expectedTypes, observedMessages.stream()
-                                                    .map(k -> ((RepairMessage) k.payload).messageType)
+                                                    .map(Message::verb)
                                                     .collect(Collectors.toList()));
     }
 
@@ -251,7 +252,7 @@ public class RepairJobTest
         assertTrue(results.stream().allMatch(s -> s.numberOfDifferences == 1));
 
         assertEquals(2, messages.size());
-        assertTrue(messages.stream().allMatch(m -> ((RepairMessage) m.payload).messageType == RepairMessage.Type.SYNC_REQUEST));
+        assertTrue(messages.stream().allMatch(m -> m.verb() == Verb.SYNC_REQ));
     }
 
     @Test
@@ -800,17 +801,16 @@ public class RepairJobTest
                 messageCapture.add(message);
             }
 
-            RepairMessage rm = (RepairMessage) message.payload;
-            switch (rm.messageType)
+            switch (message.verb())
             {
-                case SNAPSHOT:
+                case SNAPSHOT_MSG:
                     MessagingService.instance().callbacks.removeAndRespond(message.id(), to, message.emptyResponse());
                     break;
-                case VALIDATION_REQUEST:
+                case VALIDATION_REQ:
                     session.validationComplete(sessionJobDesc, to, mockTrees.get(to));
                     break;
-                case SYNC_REQUEST:
-                    SyncRequest syncRequest = (SyncRequest) rm;
+                case SYNC_REQ:
+                    SyncRequest syncRequest = (SyncRequest) message.payload;
                     session.syncComplete(sessionJobDesc, new SyncNodePair(syncRequest.src, syncRequest.dst),
                                          true, Collections.emptyList());
                     break;
diff --git a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
index 7f48788..cba64ae 100644
--- a/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/SymmetricRemoteSyncTaskTest.java
@@ -48,7 +48,7 @@ public class SymmetricRemoteSyncTaskTest extends AbstractRepairTest
         InetAddressAndPort sentTo = null;
 
         @Override
-        void sendRequest(RepairMessage request, InetAddressAndPort to)
+        void sendRequest(SyncRequest request, InetAddressAndPort to)
         {
             Assert.assertNull(sentMessage);
             Assert.assertNotNull(request);
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 9e848a9..a288edb 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -51,8 +51,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.messages.ValidationResponse;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -131,12 +130,11 @@ public class ValidatorTest
         assertNotNull(tree.hash(new Range<>(min, min)));
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(Verb.REPAIR_REQ, message.verb());
-        RepairMessage m = (RepairMessage) message.payload;
-        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(Verb.VALIDATION_RSP, message.verb());
+        ValidationResponse m = (ValidationResponse) message.payload;
         assertEquals(desc, m.desc);
-        assertTrue(((ValidationComplete) m).success());
-        assertNotNull(((ValidationComplete) m).trees);
+        assertTrue(m.success());
+        assertNotNull(m.trees);
     }
 
 
@@ -154,12 +152,11 @@ public class ValidatorTest
         validator.fail();
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(Verb.REPAIR_REQ, message.verb());
-        RepairMessage m = (RepairMessage) message.payload;
-        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(Verb.VALIDATION_RSP, message.verb());
+        ValidationResponse m = (ValidationResponse) message.payload;
         assertEquals(desc, m.desc);
-        assertFalse(((ValidationComplete) m).success());
-        assertNull(((ValidationComplete) m).trees);
+        assertFalse(m.success());
+        assertNull(m.trees);
     }
 
     @Test
@@ -214,19 +211,17 @@ public class ValidatorTest
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-        assertEquals(Verb.REPAIR_REQ, message.verb());
-        RepairMessage m = (RepairMessage) message.payload;
-        assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
+        assertEquals(Verb.VALIDATION_RSP, message.verb());
+        ValidationResponse m = (ValidationResponse) message.payload;
         assertEquals(desc, m.desc);
-        assertTrue(((ValidationComplete) m).success());
-        MerkleTrees trees = ((ValidationComplete) m).trees;
+        assertTrue(m.success());
 
-        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
+        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = m.trees.iterator();
         while (iterator.hasNext())
         {
             assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
         }
-        assertEquals(trees.rowCount(), n);
+        assertEquals(m.trees.rowCount(), n);
     }
 
     /*
@@ -273,7 +268,7 @@ public class ValidatorTest
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
+        MerkleTrees trees = ((ValidationResponse) message.payload).trees;
 
         Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
         int numTrees = 0;
@@ -335,7 +330,7 @@ public class ValidatorTest
         ValidationManager.instance.submitValidation(cfs, validator);
 
         Message message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
+        MerkleTrees trees = ((ValidationResponse) message.payload).trees;
 
         // Should have 4 trees each with a depth of on average 10 (since each range should have gotten 0.25 megabytes)
         Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator();
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
index 237e9a8..c9fd913 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 
 import static org.apache.cassandra.net.MockMessagingService.all;
-import static org.apache.cassandra.net.MockMessagingService.payload;
 import static org.apache.cassandra.net.MockMessagingService.to;
 import static org.apache.cassandra.net.MockMessagingService.verb;
 import static org.junit.Assert.fail;
@@ -275,45 +274,36 @@ public class CoordinatorMessagingTest extends AbstractRepairTest
                                               Collection<InetAddressAndPort> timeout,
                                               Function<PrepareConsistentRequest, UUID> sessionIdFunc)
     {
-        return MockMessagingService.when(
-        all(verb(Verb.REPAIR_REQ),
-            payload((p) -> p instanceof PrepareConsistentRequest))
-        ).respond((msgOut, to) ->
-                  {
-                      if(timeout.contains(to)) return null;
-                      else return Message.out(Verb.REPAIR_REQ, new PrepareConsistentResponse(sessionIdFunc.apply((PrepareConsistentRequest) msgOut.payload), to, !failed.contains(to)));
-                  });
+        return MockMessagingService.when(verb(Verb.PREPARE_CONSISTENT_REQ)).respond((msgOut, to) ->
+        {
+            if (timeout.contains(to))
+                return null;
+
+            return Message.out(Verb.PREPARE_CONSISTENT_RSP,
+                               new PrepareConsistentResponse(sessionIdFunc.apply((PrepareConsistentRequest) msgOut.payload), to, !failed.contains(to)));
+        });
     }
 
     private MockMessagingSpy createFinalizeSpy(Collection<InetAddressAndPort> failed,
                                                Collection<InetAddressAndPort> timeout)
     {
-        return MockMessagingService.when(
-        all(verb(Verb.REPAIR_REQ),
-            payload((p) -> p instanceof FinalizePropose))
-        ).respond((msgOut, to) ->
-                  {
-                      if(timeout.contains(to)) return null;
-                      else return Message.out(Verb.REPAIR_REQ,
-                                                  new FinalizePromise(((FinalizePropose) msgOut.payload).sessionID, to, !failed.contains(to)));
-                  });
+        return MockMessagingService.when(verb(Verb.FINALIZE_PROPOSE_MSG)).respond((msgOut, to) ->
+        {
+            if (timeout.contains(to))
+                return null;
+
+            return Message.out(Verb.FINALIZE_PROMISE_MSG, new FinalizePromise(((FinalizePropose) msgOut.payload).sessionID, to, !failed.contains(to)));
+        });
     }
 
     private MockMessagingSpy createCommitSpy()
     {
-        return MockMessagingService.when(
-            all(verb(Verb.REPAIR_REQ),
-                payload((p) -> p instanceof FinalizeCommit))
-        ).dontReply();
+        return MockMessagingService.when(verb(Verb.FINALIZE_COMMIT_MSG)).dontReply();
     }
 
     private MockMessagingSpy createFailSessionSpy(Collection<InetAddressAndPort> participants)
     {
-        return MockMessagingService.when(
-            all(verb(Verb.REPAIR_REQ),
-                payload((p) -> p instanceof FailSession),
-                to(participants::contains))
-        ).dontReply();
+        return MockMessagingService.when(all(verb(Verb.FAILED_SESSION_MSG), to(participants::contains))).dontReply();
     }
 
     private static RepairSessionResult createResult(CoordinatorSession coordinator)
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index c6980fe..1cee312 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.FailSession;
@@ -93,13 +94,13 @@ public class CoordinatorSessionTest extends AbstractRepairTest
 
         Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
 
-        protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+        protected void sendMessage(InetAddressAndPort destination, Message<RepairMessage> message)
         {
             if (!sentMessages.containsKey(destination))
             {
                 sentMessages.put(destination, new ArrayList<>());
             }
-            sentMessages.get(destination).add(message);
+            sentMessages.get(destination).add(message.payload);
         }
 
         Runnable onSetRepairing = null;
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index a6b4fe2..15fd1fc 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
@@ -123,13 +124,14 @@ public class LocalSessionTest extends AbstractRepairTest
     static class InstrumentedLocalSessions extends LocalSessions
     {
         Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
-        protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
+
+        protected void sendMessage(InetAddressAndPort destination, Message<? extends RepairMessage> message)
         {
             if (!sentMessages.containsKey(destination))
             {
                 sentMessages.put(destination, new ArrayList<>());
             }
-            sentMessages.get(destination).add(message);
+            sentMessages.get(destination).add(message.payload);
         }
 
         SettableFuture<Object> prepareSessionFuture = null;
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index d583d85..fa037a0 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -118,7 +118,7 @@ public class RepairMessageSerializationsTest
     @Test
     public void validationCompleteMessage_NoMerkleTree() throws IOException
     {
-        ValidationComplete deserialized = validationCompleteMessage(null);
+        ValidationResponse deserialized = validationCompleteMessage(null);
         Assert.assertNull(deserialized.trees);
     }
 
@@ -127,19 +127,19 @@ public class RepairMessageSerializationsTest
     {
         MerkleTrees trees = new MerkleTrees(Murmur3Partitioner.instance);
         trees.addMerkleTree(256, new Range<>(new LongToken(1000), new LongToken(1001)));
-        ValidationComplete deserialized = validationCompleteMessage(trees);
+        ValidationResponse deserialized = validationCompleteMessage(trees);
 
         // a simple check to make sure we got some merkle trees back.
         Assert.assertEquals(trees.size(), deserialized.trees.size());
     }
 
-    private ValidationComplete validationCompleteMessage(MerkleTrees trees) throws IOException
+    private ValidationResponse validationCompleteMessage(MerkleTrees trees) throws IOException
     {
         RepairJobDesc jobDesc = buildRepairJobDesc();
-        ValidationComplete msg = trees == null ?
-                                 new ValidationComplete(jobDesc) :
-                                 new ValidationComplete(jobDesc, trees);
-        ValidationComplete deserialized = serializeRoundTrip(msg, ValidationComplete.serializer);
+        ValidationResponse msg = trees == null ?
+                                 new ValidationResponse(jobDesc) :
+                                 new ValidationResponse(jobDesc, trees);
+        ValidationResponse deserialized = serializeRoundTrip(msg, ValidationResponse.serializer);
         return deserialized;
     }
 
@@ -164,8 +164,8 @@ public class RepairMessageSerializationsTest
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)),
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10))
         ));
-        SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries);
-        serializeRoundTrip(msg, SyncComplete.serializer);
+        SyncResponse msg = new SyncResponse(buildRepairJobDesc(), new SyncNodePair(src, dst), true, summaries);
+        serializeRoundTrip(msg, SyncResponse.serializer);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
index d876139..fedf498 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializerTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -37,16 +38,16 @@ public class RepairMessageSerializerTest
 {
     private static int MS_VERSION = MessagingService.current_version;
 
-    static RepairMessage serdes(RepairMessage message)
+    private static <T extends RepairMessage> T serdes(IVersionedSerializer<T> serializer, T message)
     {
-        int expectedSize = (int) RepairMessage.serializer.serializedSize(message, MS_VERSION);
+        int expectedSize = (int) serializer.serializedSize(message, MS_VERSION);
         try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
         {
-            RepairMessage.serializer.serialize(message, out, MS_VERSION);
+            serializer.serialize(message, out, MS_VERSION);
             Assert.assertEquals(expectedSize, out.buffer().limit());
             try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
             {
-                return RepairMessage.serializer.deserialize(in, MS_VERSION);
+                return serializer.deserialize(in, MS_VERSION);
             }
         }
         catch (IOException e)
@@ -62,54 +63,50 @@ public class RepairMessageSerializerTest
         InetAddressAndPort peer1 = InetAddressAndPort.getByName("10.0.0.2");
         InetAddressAndPort peer2 = InetAddressAndPort.getByName("10.0.0.3");
         InetAddressAndPort peer3 = InetAddressAndPort.getByName("10.0.0.4");
-        RepairMessage expected = new PrepareConsistentRequest(UUIDGen.getTimeUUID(),
-                                                              coordinator,
-                                                              Sets.newHashSet(peer1, peer2, peer3));
-        RepairMessage actual = serdes(expected);
+        PrepareConsistentRequest expected =
+            new PrepareConsistentRequest(UUIDGen.getTimeUUID(), coordinator, Sets.newHashSet(peer1, peer2, peer3));
+        PrepareConsistentRequest actual = serdes(PrepareConsistentRequest.serializer, expected);
         Assert.assertEquals(expected, actual);
     }
 
     @Test
     public void prepareConsistentResponse() throws Exception
     {
-        RepairMessage expected = new PrepareConsistentResponse(UUIDGen.getTimeUUID(),
-                                                               InetAddressAndPort.getByName("10.0.0.2"),
-                                                               true);
-        RepairMessage actual = serdes(expected);
+        PrepareConsistentResponse expected =
+            new PrepareConsistentResponse(UUIDGen.getTimeUUID(), InetAddressAndPort.getByName("10.0.0.2"), true);
+        PrepareConsistentResponse actual = serdes(PrepareConsistentResponse.serializer, expected);
         Assert.assertEquals(expected, actual);
     }
 
     @Test
     public void failSession() throws Exception
     {
-        RepairMessage expected = new FailSession(UUIDGen.getTimeUUID());
-        RepairMessage actual = serdes(expected);
+        FailSession expected = new FailSession(UUIDGen.getTimeUUID());
+        FailSession actual = serdes(FailSession.serializer, expected);
         Assert.assertEquals(expected, actual);;
     }
 
     @Test
     public void finalizeCommit() throws Exception
     {
-        RepairMessage expected = new FinalizeCommit(UUIDGen.getTimeUUID());
-        RepairMessage actual = serdes(expected);
+        FinalizeCommit expected = new FinalizeCommit(UUIDGen.getTimeUUID());
+        FinalizeCommit actual = serdes(FinalizeCommit.serializer, expected);
         Assert.assertEquals(expected, actual);;
     }
 
     @Test
     public void finalizePromise() throws Exception
     {
-        RepairMessage expected = new FinalizePromise(UUIDGen.getTimeUUID(),
-                                                     InetAddressAndPort.getByName("10.0.0.2"),
-                                                     true);
-        RepairMessage actual = serdes(expected);
+        FinalizePromise expected = new FinalizePromise(UUIDGen.getTimeUUID(), InetAddressAndPort.getByName("10.0.0.2"), true);
+        FinalizePromise actual = serdes(FinalizePromise.serializer, expected);
         Assert.assertEquals(expected, actual);
     }
 
     @Test
     public void finalizePropose() throws Exception
     {
-        RepairMessage expected = new FinalizePropose(UUIDGen.getTimeUUID());
-        RepairMessage actual = serdes(expected);
+        FinalizePropose expected = new FinalizePropose(UUIDGen.getTimeUUID());
+        FinalizePropose actual = serdes(FinalizePropose.serializer, expected);
         Assert.assertEquals(expected, actual);;
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 273d233..0a5a023 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -38,10 +38,10 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.repair.SyncNodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
@@ -79,14 +79,14 @@ public class SerializationsTest extends AbstractSerializationsTester
         partitionerSwitcher.close();
     }
 
-    private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
+    private <T extends RepairMessage> void testRepairMessageWrite(String fileName, IVersionedSerializer<T> serializer, T... messages) throws IOException
     {
         try (DataOutputStreamPlus out = getOutput(fileName))
         {
-            for (RepairMessage message : messages)
+            for (T message : messages)
             {
-                testSerializedSize(message, RepairMessage.serializer);
-                RepairMessage.serializer.serialize(message, out, getVersion());
+                testSerializedSize(message, serializer);
+                serializer.serialize(message, out, getVersion());
             }
         }
     }
@@ -94,7 +94,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     private void testValidationRequestWrite() throws IOException
     {
         ValidationRequest message = new ValidationRequest(DESC, 1234);
-        testRepairMessageWrite("service.ValidationRequest.bin", message);
+        testRepairMessageWrite("service.ValidationRequest.bin", ValidationRequest.serializer, message);
     }
 
     @Test
@@ -105,10 +105,9 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         try (DataInputStreamPlus in = getInput("service.ValidationRequest.bin"))
         {
-            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.VALIDATION_REQUEST;
+            ValidationRequest message = ValidationRequest.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
-            assert ((ValidationRequest) message).nowInSec == 1234;
+            assert message.nowInSec == 1234;
         }
     }
 
@@ -121,7 +120,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         // empty validation
         mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE);
         Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddressAndPort(), -1, PreviewKind.NONE);
-        ValidationComplete c0 = new ValidationComplete(DESC, mt);
+        ValidationResponse c0 = new ValidationResponse(DESC, mt);
 
         // validation with a tree
         mt = new MerkleTrees(p);
@@ -129,12 +128,12 @@ public class SerializationsTest extends AbstractSerializationsTester
         for (int i = 0; i < 10; i++)
             mt.split(p.getRandomToken());
         Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddressAndPort(), -1, PreviewKind.NONE);
-        ValidationComplete c1 = new ValidationComplete(DESC, mt);
+        ValidationResponse c1 = new ValidationResponse(DESC, mt);
 
         // validation failed
-        ValidationComplete c3 = new ValidationComplete(DESC);
+        ValidationResponse c3 = new ValidationResponse(DESC);
 
-        testRepairMessageWrite("service.ValidationComplete.bin", c0, c1, c3);
+        testRepairMessageWrite("service.ValidationComplete.bin", ValidationResponse.serializer, c0, c1, c3);
     }
 
     @Test
@@ -146,28 +145,25 @@ public class SerializationsTest extends AbstractSerializationsTester
         try (DataInputStreamPlus in = getInput("service.ValidationComplete.bin"))
         {
             // empty validation
-            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            ValidationResponse message = ValidationResponse.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
 
-            assert ((ValidationComplete) message).success();
-            assert ((ValidationComplete) message).trees != null;
+            assert message.success();
+            assert message.trees != null;
 
             // validation with a tree
-            message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            message = ValidationResponse.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
 
-            assert ((ValidationComplete) message).success();
-            assert ((ValidationComplete) message).trees != null;
+            assert message.success();
+            assert message.trees != null;
 
             // failed validation
-            message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
+            message = ValidationResponse.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
 
-            assert !((ValidationComplete) message).success();
-            assert ((ValidationComplete) message).trees == null;
+            assert !message.success();
+            assert message.trees == null;
         }
     }
 
@@ -178,7 +174,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         InetAddressAndPort dest = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.3", PORT);
 
         SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE);
-        testRepairMessageWrite("service.SyncRequest.bin", message);
+        testRepairMessageWrite("service.SyncRequest.bin", SyncRequest.serializer, message);
     }
 
     @Test
@@ -193,13 +189,12 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         try (DataInputStreamPlus in = getInput("service.SyncRequest.bin"))
         {
-            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.SYNC_REQUEST;
+            SyncRequest message = SyncRequest.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
-            assert local.equals(((SyncRequest) message).initiator);
-            assert src.equals(((SyncRequest) message).src);
-            assert dest.equals(((SyncRequest) message).dst);
-            assert ((SyncRequest) message).ranges.size() == 1 && ((SyncRequest) message).ranges.contains(FULL_RANGE);
+            assert local.equals(message.initiator);
+            assert src.equals(message.src);
+            assert dest.equals(message.dst);
+            assert message.ranges.size() == 1 && message.ranges.contains(FULL_RANGE);
         }
     }
 
@@ -213,11 +208,11 @@ public class SerializationsTest extends AbstractSerializationsTester
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)),
                                          Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10))
         ));
-        SyncComplete success = new SyncComplete(DESC, src, dest, true, summaries);
+        SyncResponse success = new SyncResponse(DESC, src, dest, true, summaries);
         // sync fail
-        SyncComplete fail = new SyncComplete(DESC, src, dest, false, Collections.emptyList());
+        SyncResponse fail = new SyncResponse(DESC, src, dest, false, Collections.emptyList());
 
-        testRepairMessageWrite("service.SyncComplete.bin", success, fail);
+        testRepairMessageWrite("service.SyncComplete.bin", SyncResponse.serializer, success, fail);
     }
 
     @Test
@@ -233,22 +228,20 @@ public class SerializationsTest extends AbstractSerializationsTester
         try (DataInputStreamPlus in = getInput("service.SyncComplete.bin"))
         {
             // success
-            RepairMessage message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+            SyncResponse message = SyncResponse.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
 
             System.out.println(nodes);
-            System.out.println(((SyncComplete) message).nodes);
-            assert nodes.equals(((SyncComplete) message).nodes);
-            assert ((SyncComplete) message).success;
+            System.out.println(message.nodes);
+            assert nodes.equals(message.nodes);
+            assert message.success;
 
             // fail
-            message = RepairMessage.serializer.deserialize(in, getVersion());
-            assert message.messageType == RepairMessage.Type.SYNC_COMPLETE;
+            message = SyncResponse.serializer.deserialize(in, getVersion());
             assert DESC.equals(message.desc);
 
-            assert nodes.equals(((SyncComplete) message).nodes);
-            assert !((SyncComplete) message).success;
+            assert nodes.equals(message.nodes);
+            assert !message.success;
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org