You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/08/26 22:25:56 UTC

[cassandra] branch trunk updated: Add test which validates that Message serializedSize(version) == serialize(out, version).length

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0697489  Add test which validates that Message serializedSize(version) == serialize(out, version).length
0697489 is described below

commit 0697489f7455fa078cb454776b98236f18f82dd4
Author: David Capwell <dc...@apache.org>
AuthorDate: Wed Aug 26 14:34:32 2020 -0700

    Add test which validates that Message serializedSize(version) == serialize(out, version).length
    
    patch by David Capwell; reviewed by Caleb Rackliffe, Jon Meredith for CASSANDRA-16064
---
 build.xml                                          |   2 +-
 lib/commons-lang3-3.1.jar                          | Bin 315805 -> 0 bytes
 lib/commons-lang3-3.11.jar                         | Bin 0 -> 577742 bytes
 src/java/org/apache/cassandra/db/ReadCommand.java  |  20 +-
 .../org/apache/cassandra/db/SchemaCQLHelper.java   |  22 ++
 .../apache/cassandra/db/marshal/AbstractType.java  |   2 +-
 .../org/apache/cassandra/db/marshal/EmptyType.java |   8 +
 .../org/apache/cassandra/db/marshal/UserType.java  |  45 ++-
 src/java/org/apache/cassandra/net/Message.java     |   2 +-
 .../org/apache/cassandra/net/MessagingService.java |  17 +-
 src/java/org/apache/cassandra/net/PingRequest.java |  27 +-
 src/java/org/apache/cassandra/schema/Schema.java   |  19 +-
 .../org/apache/cassandra/schema/TableMetadata.java |   2 +-
 .../cassandra/schema/TableMetadataProvider.java    |  24 ++
 .../cassandra/serializers/TimestampSerializer.java |  10 +-
 .../cassandra/serializers/UserTypeSerializer.java  |  13 +-
 .../org/apache/cassandra/utils/ByteArrayUtil.java  | 209 +++++++++++
 .../apache/cassandra/utils/FailingConsumer.java    |  42 +++
 .../cql3/validation/entities/TupleTypeTest.java    | 157 +++++++++
 .../cassandra/db/marshal/TimestampTypeTest.java    |  45 +++
 .../cassandra/db/marshal/TypeValidationTest.java   | 132 ++++++-
 .../net/MessageSerializationPropertyTest.java      | 144 ++++++++
 .../cassandra/utils/AbstractTypeGenerators.java    | 381 +++++++++++++++++++++
 .../apache/cassandra/utils/ByteArrayUtilTest.java  | 211 ++++++++++++
 .../cassandra/utils/CassandraGenerators.java       | 286 ++++++++++++++++
 .../cassandra/utils/FixedMonotonicClock.java       |  68 ++++
 .../org/apache/cassandra/utils/Generators.java     | 354 ++++++++++++++++++-
 .../org/apache/cassandra/utils/GeneratorsTest.java |  30 ++
 28 files changed, 2203 insertions(+), 69 deletions(-)

diff --git a/build.xml b/build.xml
index 7b67314..66a0d2e 100644
--- a/build.xml
+++ b/build.xml
@@ -563,7 +563,7 @@
           <dependency groupId="org.hdrhistogram" artifactId="HdrHistogram" version="2.1.9"/>
           <dependency groupId="commons-cli" artifactId="commons-cli" version="1.1"/>
           <dependency groupId="commons-codec" artifactId="commons-codec" version="1.9"/>
-          <dependency groupId="org.apache.commons" artifactId="commons-lang3" version="3.1"/>
+          <dependency groupId="org.apache.commons" artifactId="commons-lang3" version="3.11"/>
           <dependency groupId="org.apache.commons" artifactId="commons-math3" version="3.2"/>
           <dependency groupId="org.antlr" artifactId="antlr" version="3.5.2">
             <exclusion groupId="org.antlr" artifactId="stringtemplate"/>
diff --git a/lib/commons-lang3-3.1.jar b/lib/commons-lang3-3.1.jar
deleted file mode 100644
index a85e539..0000000
Binary files a/lib/commons-lang3-3.1.jar and /dev/null differ
diff --git a/lib/commons-lang3-3.11.jar b/lib/commons-lang3-3.11.jar
new file mode 100644
index 0000000..bbaa8a6
Binary files /dev/null and b/lib/commons-lang3-3.11.jar differ
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index ffdfc7c..60ddaa9 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -60,6 +61,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataProvider;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
@@ -904,8 +906,22 @@ public abstract class ReadCommand extends AbstractReadQuery
         }
     }
 
-    private static class Serializer implements IVersionedSerializer<ReadCommand>
+    @VisibleForTesting
+    public static class Serializer implements IVersionedSerializer<ReadCommand>
     {
+        private final TableMetadataProvider schema;
+
+        public Serializer()
+        {
+            this(Schema.instance);
+        }
+
+        @VisibleForTesting
+        public Serializer(TableMetadataProvider schema)
+        {
+            this.schema = Objects.requireNonNull(schema, "schema");
+        }
+
         private static int digestFlag(boolean isDigest)
         {
             return isDigest ? 0x01 : 0;
@@ -983,7 +999,7 @@ public abstract class ReadCommand extends AbstractReadQuery
 
             boolean hasIndex = hasIndex(flags);
             int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
-            TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
+            TableMetadata metadata = schema.getExistingTableMetadata(TableId.deserialize(in));
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
diff --git a/src/java/org/apache/cassandra/db/SchemaCQLHelper.java b/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
index 6f9e526..ded1692 100644
--- a/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/SchemaCQLHelper.java
@@ -19,10 +19,13 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.schema.*;
@@ -32,6 +35,9 @@ import org.apache.cassandra.schema.*;
  */
 public class SchemaCQLHelper
 {
+    private static final Pattern EMPTY_TYPE_REGEX = Pattern.compile("empty", Pattern.LITERAL);
+    private static final String EMPTY_TYPE_QUOTED = Matcher.quoteReplacement("'org.apache.cassandra.db.marshal.EmptyType'");
+
     /**
      * Generates the DDL statement for a {@code schema.cql} snapshot file.
      */
@@ -155,4 +161,20 @@ public class SchemaCQLHelper
                                                                               UTF8Type.instance.getString(name),
                                                                               metadata)));
     }
+
+    /**
+     * Converts the type to a CQL type.  This method special cases empty and UDTs so the string can be used in a create
+     * statement.
+     *
+     * Special cases
+     * <ul>
+     *     <li>empty - replaces with 'org.apache.cassandra.db.marshal.EmptyType'.  empty is the tostring of the type in
+     *     CQL but not allowed to create as empty, but fully qualified name is allowed</li>
+     *     <li>UserType - replaces with TupleType</li>
+     * </ul>
+     */
+    public static String toCqlType(AbstractType<?> type)
+    {
+        return EMPTY_TYPE_REGEX.matcher(type.expandUserTypes().asCQL3Type().toString()).replaceAll(EMPTY_TYPE_QUOTED);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 26146d7..0a34785 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -420,7 +420,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>, Assignm
 
     public long writtenLength(ByteBuffer value)
     {
-        assert value.hasRemaining();
+        assert value.hasRemaining() : "bytes should not be empty for type " + this;
         return valueLengthIfFixed() >= 0
              ? value.remaining()
              : TypeSizes.sizeofWithVIntLength(value);
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
index 88d62c4..808402f 100644
--- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -121,6 +121,14 @@ public class EmptyType extends AbstractType<Void>
     }
 
     @Override
+    public long writtenLength(ByteBuffer value)
+    {
+        // default implemenation requires non-empty bytes but this always requires empty bytes, so special case
+        validate(value);
+        return 0;
+    }
+
+    @Override
     public ByteBuffer readValue(DataInputPlus in)
     {
         return ByteBufferUtil.EMPTY_BYTE_BUFFER;
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index 3c023b7..dfc726d 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -24,6 +24,9 @@ import java.util.stream.Collectors;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
@@ -37,6 +40,7 @@ import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.ColumnIdentifier.maybeQuote;
 
 /**
  * A user defined type.
@@ -45,6 +49,10 @@ import static com.google.common.collect.Iterables.transform;
  */
 public class UserType extends TupleType implements SchemaElement
 {
+    private static final Logger logger = LoggerFactory.getLogger(UserType.class);
+
+    private static final ConflictBehavior CONFLICT_BEHAVIOR = ConflictBehavior.get();
+
     public final String keyspace;
     public final ByteBuffer name;
     private final List<FieldIdentifier> fieldNames;
@@ -67,7 +75,9 @@ public class UserType extends TupleType implements SchemaElement
         {
             String stringFieldName = fieldNames.get(i).toString();
             stringFieldNames.add(stringFieldName);
-            fieldSerializers.put(stringFieldName, fieldTypes.get(i).getSerializer());
+            TypeSerializer<?> existing = fieldSerializers.put(stringFieldName, fieldTypes.get(i).getSerializer());
+            if (existing != null)
+                CONFLICT_BEHAVIOR.onConflict(keyspace, getNameAsString(), stringFieldName);
         }
         this.serializer = new UserTypeSerializer(fieldSerializers);
     }
@@ -434,7 +444,7 @@ public class UserType extends TupleType implements SchemaElement
 
     public String getCqlTypeName()
     {
-        return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(getNameAsString()));
+        return String.format("%s.%s", maybeQuote(keyspace), maybeQuote(getNameAsString()));
     }
 
     @Override
@@ -490,4 +500,35 @@ public class UserType extends TupleType implements SchemaElement
 
         return builder.toString();
     }
+
+    private enum ConflictBehavior
+    {
+        LOG {
+            void onConflict(String keyspace, String name, String fieldName)
+            {
+                logger.error("Duplicate names found in UDT {}.{} for column {}",
+                             maybeQuote(keyspace), maybeQuote(name), maybeQuote(fieldName));
+            }
+        },
+        REJECT {
+            @Override
+            void onConflict(String keyspace, String name, String fieldName)
+            {
+
+                throw new AssertionError(String.format("Duplicate names found in UDT %s.%s for column %s; " +
+                                                       "to resolve set -D" + UDT_CONFLICT_BEHAVIOR + "=LOG on startup and remove the type",
+                                                       maybeQuote(keyspace), maybeQuote(name), maybeQuote(fieldName)));
+            }
+        };
+
+        private static final String UDT_CONFLICT_BEHAVIOR = "cassandra.type.udt.conflict_behavior";
+
+        abstract void onConflict(String keyspace, String name, String fieldName);
+
+        static ConflictBehavior get()
+        {
+            String value = System.getProperty(UDT_CONFLICT_BEHAVIOR, REJECT.name());
+            return ConflictBehavior.valueOf(value);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 01ba5d4..e0a2e7f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -683,7 +683,7 @@ public class Message<T>
             // int cast cuts off the high-order half of the timestamp, which we can assume remains
             // the same between now and when the recipient reconstructs it.
             out.writeInt((int) approxTime.translate().toMillisSinceEpoch(header.createdAtNanos));
-            out.writeUnsignedVInt(1 + NANOSECONDS.toMillis(header.expiresAtNanos - header.createdAtNanos));
+            out.writeUnsignedVInt(NANOSECONDS.toMillis(header.expiresAtNanos - header.createdAtNanos));
             out.writeUnsignedVInt(header.verb.id);
             out.writeUnsignedVInt(header.flags);
             serializeParams(header.params, out, version);
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0827f78..c0f57f8 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.net;
 
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -45,7 +43,6 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static java.util.Collections.synchronizedList;
 import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.concurrent.Stage.MUTATION;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 
@@ -210,6 +207,20 @@ public final class MessagingService extends MessagingServiceMBeanImpl
     static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
 
+    public enum Version
+    {
+        VERSION_30(10),
+        VERSION_3014(11),
+        VERSION_40(12);
+
+        public final int value;
+
+        Version(int value)
+        {
+            this.value = value;
+        }
+    }
+
     private static class MSHandle
     {
         public static final MessagingService instance = new MessagingService(false);
diff --git a/src/java/org/apache/cassandra/net/PingRequest.java b/src/java/org/apache/cassandra/net/PingRequest.java
index c02bd80..6b72547 100644
--- a/src/java/org/apache/cassandra/net/PingRequest.java
+++ b/src/java/org/apache/cassandra/net/PingRequest.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.net;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -30,7 +32,7 @@ import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
 /**
  * Indicates to the recipient which {@link ConnectionType} should be used for the response.
  */
-class PingRequest
+public class PingRequest
 {
     static final PingRequest forUrgent = new PingRequest(URGENT_MESSAGES);
     static final PingRequest forSmall  = new PingRequest(SMALL_MESSAGES);
@@ -43,6 +45,18 @@ class PingRequest
         this.connectionType = connectionType;
     }
 
+    @VisibleForTesting
+    public static PingRequest get(ConnectionType type)
+    {
+        switch (type)
+        {
+            case URGENT_MESSAGES: return forUrgent;
+            case  SMALL_MESSAGES: return forSmall;
+            case  LARGE_MESSAGES: return forLarge;
+            default: throw new IllegalArgumentException("Unsupported type: " + type);
+        }
+    }
+
     static IVersionedSerializer<PingRequest> serializer = new IVersionedSerializer<PingRequest>()
     {
         public void serialize(PingRequest t, DataOutputPlus out, int version) throws IOException
@@ -52,16 +66,7 @@ class PingRequest
 
         public PingRequest deserialize(DataInputPlus in, int version) throws IOException
         {
-            ConnectionType type = ConnectionType.fromId(in.readByte());
-
-            switch (type)
-            {
-                case URGENT_MESSAGES: return forUrgent;
-                case  SMALL_MESSAGES: return forSmall;
-                case  LARGE_MESSAGES: return forLarge;
-            }
-
-            throw new IllegalStateException();
+            return get(ConnectionType.fromId(in.readByte()));
         }
 
         public long serializedSize(PingRequest t, int version)
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index e2be6ee..0498993 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -51,7 +51,7 @@ import static java.lang.String.format;
 
 import static com.google.common.collect.Iterables.size;
 
-public final class Schema
+public final class Schema implements TableMetadataProvider
 {
     public static final Schema instance = new Schema();
 
@@ -417,6 +417,7 @@ public final class Schema
              : ksm.getTableOrViewNullable(table);
     }
 
+    @Override
     @Nullable
     public TableMetadata getTableMetadata(TableId id)
     {
@@ -445,22 +446,6 @@ public final class Schema
         return getTableMetadata(descriptor.ksname, descriptor.cfname);
     }
 
-    /**
-     * @throws UnknownTableException if the table couldn't be found in the metadata
-     */
-    public TableMetadata getExistingTableMetadata(TableId id) throws UnknownTableException
-    {
-        TableMetadata metadata = getTableMetadata(id);
-        if (metadata != null)
-            return metadata;
-
-        String message =
-            String.format("Couldn't find table with id %s. If a table was just created, this is likely due to the schema"
-                          + "not being fully propagated.  Please wait for schema agreement on table creation.",
-                          id);
-        throw new UnknownTableException(message, id);
-    }
-
     /* Function helpers */
 
     /**
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 4c917dd..7880c2a 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -911,7 +911,7 @@ public final class TableMetadata implements SchemaElement
             return this;
         }
 
-        Builder addColumns(Iterable<ColumnMetadata> columns)
+        public Builder addColumns(Iterable<ColumnMetadata> columns)
         {
             columns.forEach(this::addColumn);
             return this;
diff --git a/src/java/org/apache/cassandra/schema/TableMetadataProvider.java b/src/java/org/apache/cassandra/schema/TableMetadataProvider.java
new file mode 100644
index 0000000..7c5ae8a
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/TableMetadataProvider.java
@@ -0,0 +1,24 @@
+package org.apache.cassandra.schema;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.exceptions.UnknownTableException;
+
+public interface TableMetadataProvider
+{
+    @Nullable
+    TableMetadata getTableMetadata(TableId id);
+
+    default TableMetadata getExistingTableMetadata(TableId id) throws UnknownTableException
+    {
+        TableMetadata metadata = getTableMetadata(id);
+        if (metadata != null)
+            return metadata;
+
+        String message =
+            String.format("Couldn't find table with id %s. If a table was just created, this is likely due to the schema"
+                          + "not being fully propagated.  Please wait for schema agreement on table creation.",
+                          id);
+        throw new UnknownTableException(message, id);
+    }
+}
diff --git a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
index 49eb603..ba35f64 100644
--- a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
@@ -101,14 +101,6 @@ public class TimestampSerializer implements TypeSerializer<Date>
 
     private static final Pattern timestampPattern = Pattern.compile("^-?\\d+$");
 
-    private static final FastThreadLocal<SimpleDateFormat> FORMATTER = new FastThreadLocal<SimpleDateFormat>()
-    {
-        protected SimpleDateFormat initialValue()
-        {
-            return new SimpleDateFormat("yyyy-MM-dd HH:mmXX");
-        }
-    };
-
     private static final FastThreadLocal<SimpleDateFormat> FORMATTER_UTC = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
@@ -188,7 +180,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
 
     public String toString(Date value)
     {
-        return value == null ? "" : FORMATTER.get().format(value);
+        return toStringUTC(value);
     }
 
     public String toStringUTC(Date value)
diff --git a/src/java/org/apache/cassandra/serializers/UserTypeSerializer.java b/src/java/org/apache/cassandra/serializers/UserTypeSerializer.java
index 472e39b..7af6c4a 100644
--- a/src/java/org/apache/cassandra/serializers/UserTypeSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/UserTypeSerializer.java
@@ -36,9 +36,10 @@ public class UserTypeSerializer extends BytesSerializer
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         ByteBuffer input = bytes.duplicate();
-        int i = 0;
+        int i = -1; // first thing in the loop is to increment, so when starting this will get set to 0 and match the fields
         for (Entry<String, TypeSerializer<?>> entry : fields.entrySet())
         {
+            i++;
             // we allow the input to have less fields than declared so as to support field addition.
             if (!input.hasRemaining())
                 return;
@@ -56,8 +57,14 @@ public class UserTypeSerializer extends BytesSerializer
                 throw new MarshalException(String.format("Not enough bytes to read %dth field %s", i, entry.getKey()));
 
             ByteBuffer field = ByteBufferUtil.readBytes(input, size);
-            entry.getValue().validate(field);
-            i++;
+            try
+            {
+                entry.getValue().validate(field);
+            }
+            catch (MarshalException e)
+            {
+                throw new MarshalException(String.format("Failure validating the %dth field %s; %s", i, entry.getKey(), e.getMessage()), e);
+            }
         }
 
         // We're allowed to get less fields than declared, but not more
diff --git a/src/java/org/apache/cassandra/utils/ByteArrayUtil.java b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
new file mode 100644
index 0000000..b97a1c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ByteArrayUtil.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+public class ByteArrayUtil
+{
+    private ByteArrayUtil()
+    {
+
+    }
+
+    /*
+     * Methods for unpacking primitive values from byte arrays starting at
+     * given offsets.
+     */
+
+    public static boolean getBoolean(byte[] b)
+    {
+        return getBoolean(b, 0);
+    }
+
+    public static boolean getBoolean(byte[] b, int off)
+    {
+        return b[off] != 0;
+    }
+
+    /**
+     * @return signed short encoded as big endian
+     */
+    public static short getShort(byte[] b)
+    {
+        return getShort(b, 0);
+    }
+
+    /**
+     * @return signed short from the given offset encoded as big endian
+     */
+    public static short getShort(byte[] b, int off)
+    {
+        return (short) (b[off    ] << 8 |
+                        b[off + 1] & 255);
+    }
+
+    /**
+     * @return signed int encoded as big endian
+     */
+    public static int getInt(byte[] b)
+    {
+        return getInt(b, 0);
+    }
+
+    /**
+     * @return signed int from the given offset encoded as big endian
+     */
+    public static int getInt(byte[] b, int off)
+    {
+        return (b[off    ] & 255) << 24 |
+               (b[off + 1] & 255) << 16 |
+               (b[off + 2] & 255) <<  8 |
+               (b[off + 3] & 255);
+    }
+
+    /**
+     * @return signed float encoded as big endian
+     */
+    public static float getFloat(byte[] b)
+    {
+        return getFloat(b, 0);
+    }
+
+    /**
+     * @return signed float from the given offset encoded as big endian
+     */
+    public static float getFloat(byte[] b, int off)
+    {
+        return Float.intBitsToFloat(getInt(b, off));
+    }
+
+    /**
+     * @return signed long encoded as big endian
+     */
+    public static long getLong(byte[] b)
+    {
+        return getLong(b, 0);
+    }
+
+    /**
+     * @return signed long from the given offset encoded as big endian
+     */
+    public static long getLong(byte[] b, int off)
+    {
+        return ((long) b[off    ] & 255L) << 56 |
+               ((long) b[off + 1] & 255L) << 48 |
+               ((long) b[off + 2] & 255L) << 40 |
+               ((long) b[off + 3] & 255L) << 32 |
+               ((long) b[off + 4] & 255L) << 24 |
+               ((long) b[off + 5] & 255L) << 16 |
+               ((long) b[off + 6] & 255L) <<  8 |
+               ((long) b[off + 7] & 255L);
+    }
+
+    /**
+     * @return signed double from the given offset encoded as big endian
+     */
+    public static double getDouble(byte[] b)
+    {
+        return getDouble(b, 0);
+    }
+
+    /**
+     * @return signed double from the given offset encoded as big endian
+     */
+    public static double getDouble(byte[] b, int off)
+    {
+        return Double.longBitsToDouble(getLong(b, off));
+    }
+
+    /*
+     * Methods for packing primitive values into byte arrays starting at given
+     * offsets.
+     */
+
+    public static void putBoolean(byte[] b, int off, boolean val)
+    {
+        ensureCapacity(b, off, 1);
+        b[off] = (byte) (val ? 1 : 0);
+    }
+
+    /**
+     * Store a signed short at the given offset encoded as big endian
+     */
+    public static void putShort(byte[] b, int off, short val)
+    {
+        ensureCapacity(b, off, Short.BYTES);
+        b[off + 1] = (byte) (val      );
+        b[off    ] = (byte) (val >>> 8);
+    }
+
+    /**
+     * Store a signed int at the given offset encoded as big endian
+     */
+    public static void putInt(byte[] b, int off, int val)
+    {
+        ensureCapacity(b, off, Integer.BYTES);
+        b[off + 3] = (byte) (val       );
+        b[off + 2] = (byte) (val >>>  8);
+        b[off + 1] = (byte) (val >>> 16);
+        b[off    ] = (byte) (val >>> 24);
+    }
+
+    /**
+     * Store a signed float at the given offset encoded as big endian
+     */
+    public static void putFloat(byte[] b, int off, float val)
+    {
+        putInt(b, off,  Float.floatToIntBits(val));
+    }
+
+    /**
+     * Store a signed long at the given offset encoded as big endian
+     */
+    public static void putLong(byte[] b, int off, long val)
+    {
+        ensureCapacity(b, off, Long.BYTES);
+        b[off + 7] = (byte) (val       );
+        b[off + 6] = (byte) (val >>>  8);
+        b[off + 5] = (byte) (val >>> 16);
+        b[off + 4] = (byte) (val >>> 24);
+        b[off + 3] = (byte) (val >>> 32);
+        b[off + 2] = (byte) (val >>> 40);
+        b[off + 1] = (byte) (val >>> 48);
+        b[off    ] = (byte) (val >>> 56);
+    }
+
+    /**
+     * Store a signed double at the given offset encoded as big endian
+     */
+    public static void putDouble(byte[] b, int off, double val)
+    {
+        putLong(b, off, Double.doubleToLongBits(val));
+    }
+
+    private static void ensureCapacity(byte[] b, int off, int len)
+    {
+        int writable = b.length - off;
+        if (writable < len)
+        {
+            if (writable < 0)
+                throw new IndexOutOfBoundsException("Attempted to write to offset " + off + " but array length is " + b.length);
+            throw new IndexOutOfBoundsException("Attempted to write " + len + " bytes to array with remaining capacity of " + writable);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/FailingConsumer.java b/src/java/org/apache/cassandra/utils/FailingConsumer.java
new file mode 100644
index 0000000..93cec10
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/FailingConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.function.Consumer;
+
+public interface FailingConsumer<T> extends Consumer<T>
+{
+    void doAccept(T t) throws Throwable;
+
+    default void accept(T t)
+    {
+        try
+        {
+            doAccept(t);
+        }
+        catch (Throwable e)
+        {
+            throw Throwables.throwAsUncheckedException(e);
+        }
+    }
+
+    static <T> FailingConsumer<T> orFail(FailingConsumer<T> fn)
+    {
+        return fn;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
index 28430cb..f9ef4cc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
@@ -17,13 +17,35 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Locale;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.SchemaCQLHelper;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.utils.AbstractTypeGenerators.TypeSupport;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import static org.apache.cassandra.db.SchemaCQLHelper.toCqlType;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.primitiveTypeGen;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.tupleTypeGen;
+import static org.apache.cassandra.utils.FailingConsumer.orFail;
+import static org.apache.cassandra.utils.Generators.filter;
+import static org.quicktheories.QuickTheory.qt;
 
 public class TupleTypeTest extends CQLTester
 {
@@ -233,5 +255,140 @@ public class TupleTypeTest extends CQLTester
         DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mmX", Locale.ENGLISH);
         assertRows(execute("SELECT tdemo FROM %s"), row(tuple( df.parse("2017-02-03 03:05+0000"), "Europe")));
     }
+
+    @Test
+    public void tuplePartitionReadWrite()
+    {
+        qt().withExamples(100).withShrinkCycles(0).forAll(typesAndRowsGen()).checkAssert(orFail(testcase -> {
+            TupleType tupleType = testcase.type;
+            createTable("CREATE TABLE %s (id " + toCqlType(tupleType) + ", value int, PRIMARY KEY(id))");
+            SortedMap<ByteBuffer, Integer> map = new TreeMap<>(Comparator.comparing(currentTableMetadata().partitioner::decorateKey));
+            int count = 0;
+            for (ByteBuffer value : testcase.uniqueRows)
+            {
+                map.put(value, count);
+                ByteBuffer[] tupleBuffers = tupleType.split(value);
+
+                // use cast to avoid warning
+                execute("INSERT INTO %s (id, value) VALUES (?, ?)", tuple((Object[]) tupleBuffers), count);
+
+                assertRows(execute("SELECT * FROM %s WHERE id = ?", tuple((Object[]) tupleBuffers)),
+                           row(tuple((Object[]) tupleBuffers), count));
+                count++;
+            }
+            assertRows(execute("SELECT * FROM %s LIMIT 100"),
+                       map.entrySet().stream().map(e -> row(e.getKey(), e.getValue())).toArray(Object[][]::new));
+        }));
+    }
+
+    @Test
+    public void tupleCkReadWriteAsc()
+    {
+        tupleCkReadWrite(Order.ASC);
+    }
+
+    @Test
+    public void tupleCkReadWriteDesc()
+    {
+        tupleCkReadWrite(Order.DESC);
+    }
+
+    private void tupleCkReadWrite(Order order)
+    {
+        // for some reason this test is much slower than the partition key test: with 100 examples partition key is 6s and these tests were 20-30s
+        qt().withExamples(50).withShrinkCycles(0).forAll(typesAndRowsGen()).checkAssert(orFail(testcase -> {
+            TupleType tupleType = testcase.type;
+            createTable("CREATE TABLE %s (pk int, ck " + toCqlType(tupleType) + ", value int, PRIMARY KEY(pk, ck))" +
+                        " WITH CLUSTERING ORDER BY (ck "+order.name()+")");
+            String cql = SchemaCQLHelper.getTableMetadataAsCQL(currentTableMetadata(), false, false, false);
+            SortedMap<ByteBuffer, Integer> map = new TreeMap<>(order.apply(tupleType));
+            int count = 0;
+            for (ByteBuffer value : testcase.uniqueRows)
+            {
+                map.put(value, count);
+                ByteBuffer[] tupleBuffers = tupleType.split(value);
+
+                // use cast to avoid warning
+                execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, tuple((Object[]) tupleBuffers), count);
+
+                assertRows(execute("SELECT * FROM %s WHERE pk = ? AND ck = ?", 1, tuple((Object[]) tupleBuffers)),
+                           row(1, tuple((Object[]) tupleBuffers), count));
+                count++;
+            }
+            UntypedResultSet results = execute("SELECT * FROM %s LIMIT 100");
+            assertRows(results,
+                       map.entrySet().stream().map(e -> row(1, e.getKey(), e.getValue())).toArray(Object[][]::new));
+        }));
+    }
+
+    private static final class TypeAndRows
+    {
+        TupleType type;
+        List<ByteBuffer> uniqueRows;
+    }
+
+    private static Gen<TypeAndRows> typesAndRowsGen()
+    {
+        return typesAndRowsGen(10);
+    }
+
+    private static Gen<TypeAndRows> typesAndRowsGen(int numRows)
+    {
+        Gen<TupleType> typeGen = tupleTypeGen(primitiveTypeGen(), SourceDSL.integers().between(1, 10));
+        Set<ByteBuffer> distinctRows = new HashSet<>(numRows); // reuse the memory
+        Gen<TypeAndRows> gen = rnd -> {
+            TypeAndRows c = new TypeAndRows();
+            c.type = typeGen.generate(rnd);
+            TypeSupport<ByteBuffer> support = getTypeSupport(c.type);
+            Gen<ByteBuffer> valueGen = filter(support.valueGen, b -> b.remaining() <= Short.MAX_VALUE);
+            valueGen = filter(valueGen, 20, v -> !distinctRows.contains(v));
+
+            distinctRows.clear();
+            for (int i = 0; i < numRows; i++)
+            {
+                try
+                {
+                    assert distinctRows.add(valueGen.generate(rnd)) : "unable to add distinct row";
+                }
+                catch (IllegalStateException e)
+                {
+                    // gave up trying to find values... so just try with how ever many rows we could
+                    logger.warn("Unable to generate enough distinct rows; using {} rows", distinctRows.size());
+                    break;
+                }
+            }
+            c.uniqueRows = new ArrayList<>(distinctRows);
+            return c;
+        };
+        gen = gen.describedAs(c -> c.type.asCQL3Type().toString());
+        return gen;
+    }
+
+    private enum Order {
+        ASC
+        {
+            <T> Comparator<T> apply(Comparator<T> c)
+            {
+                return c;
+            }
+        },
+        DESC
+        {
+            <T> Comparator<T> apply(Comparator<T> c)
+            {
+                return c.reversed();
+            }
+        };
+
+        abstract <T> Comparator<T> apply(Comparator<T> c);
+    }
+
+    private static List<Object[]> toObjects(UntypedResultSet results)
+    {
+        List<Object[]> rows = new ArrayList<>(results.size());
+        for (UntypedResultSet.Row row : results)
+            rows.add(results.metadata().stream().map(c -> c.type.compose(row.getBlob(c.name.toString()))).toArray());
+        return rows;
+    }
 }
 
diff --git a/test/unit/org/apache/cassandra/db/marshal/TimestampTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/TimestampTypeTest.java
new file mode 100644
index 0000000..b34207f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/marshal/TimestampTypeTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.marshal;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.AbstractTypeGenerators.TypeSupport;
+import org.assertj.core.api.Assertions;
+
+import static org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
+import static org.quicktheories.QuickTheory.qt;
+
+public class TimestampTypeTest
+{
+    @Test
+    public void stringProperty()
+    {
+        TypeSupport<Date> support = getTypeSupport(TimestampType.instance);
+        qt().forAll(support.valueGen).checkAssert(date -> {
+            ByteBuffer buffer = TimestampType.instance.decompose(date);
+            String toString = TimestampType.instance.getString(buffer);
+            Assertions.assertThat(TimestampType.instance.fromString(toString))
+                      .as("TimestampType.fromString(TimestampType.getString(buffer)) == buffer;\nviolated with toString %s", toString)
+                      .isEqualTo(buffer);
+        });
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/marshal/TypeValidationTest.java b/test/unit/org/apache/cassandra/db/marshal/TypeValidationTest.java
index ed5e2bf..cdcc5a6 100644
--- a/test/unit/org/apache/cassandra/db/marshal/TypeValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/TypeValidationTest.java
@@ -1,15 +1,3 @@
-package org.apache.cassandra.db.marshal;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.UUIDGen;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.UUID;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -28,6 +16,28 @@ import java.util.UUID;
  * limitations under the License.
  */
 
+package org.apache.cassandra.db.marshal;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.assertj.core.api.Assertions;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.UUID;
+
+import static org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.primitiveTypeGen;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.userTypeGen;
+import static org.quicktheories.QuickTheory.qt;
 
 public class TypeValidationTest
 {
@@ -125,5 +135,103 @@ public class TypeValidationTest
         UTF8Type.instance.validate(ByteBuffer.wrap(new byte[] {(byte)0xf0, (byte)0x90, (byte)0x81, (byte)0xff}));
     }
 
+    private static Gen<? extends TupleType> flatTupleGen()
+    {
+        return AbstractTypeGenerators.tupleTypeGen(primitiveTypeGen(), SourceDSL.integers().between(0, 20));
+    }
+
+    private static Gen<? extends TupleType> nestedTupleGen()
+    {
+        return AbstractTypeGenerators.tupleTypeGen();
+    }
+
+    private static Gen<? extends TupleType> flatUDTGen()
+    {
+        return userTypeGen(primitiveTypeGen(), SourceDSL.integers().between(0, 20));
+    }
+
+    private static  Gen<? extends TupleType> nestedUDTGen()
+    {
+        return AbstractTypeGenerators.userTypeGen();
+    }
+
+    @Test
+    public void buildAndSplitTupleFlat()
+    {
+        buildAndSplit(flatTupleGen());
+    }
+
+    @Test
+    public void buildAndSplitTupleNested()
+    {
+        buildAndSplit(nestedTupleGen());
+    }
+
+    @Test
+    public void buildAndSplitUDTFlat()
+    {
+        buildAndSplit(flatUDTGen());
+    }
+
+    @Test
+    public void buildAndSplitUDTNested()
+    {
+        buildAndSplit(nestedUDTGen());
+    }
+
+    private static void buildAndSplit(Gen<? extends TupleType> baseGen)
+    {
+        qt().forAll(tupleWithValueGen(baseGen)).checkAssert(pair -> {
+            TupleType tuple = pair.left;
+            ByteBuffer value = pair.right;
+            Assertions.assertThat(TupleType.buildValue(tuple.split(value)))
+                      .as("TupleType.buildValue(split(value)) == value")
+                      .isEqualTo(value);
+        });
+    }
+
+    @Test
+    public void validateTupleFlat()
+    {
+        validate(flatTupleGen());
+    }
+
+    @Test
+    public void validateTupleNested()
+    {
+        validate(nestedTupleGen());
+    }
+
+    private static void validate(Gen<? extends TupleType> baseGen)
+    {
+        qt().forAll(tupleWithValueGen(baseGen)).checkAssert(pair -> {
+            TupleType tuple = pair.left;
+            ByteBuffer value = pair.right;
+            tuple.validate(value);
+        });
+    }
+
+    private static Gen<Pair<TupleType, ByteBuffer>> tupleWithValueGen(Gen<? extends TupleType> baseGen)
+    {
+        Gen<Pair<TupleType, ByteBuffer>> gen = rnd -> {
+            TupleType type = baseGen.generate(rnd);
+            return Pair.create(type, getTypeSupport(type).valueGen.generate(rnd));
+        };
+        gen = gen.describedAs(pair -> pair.left.asCQL3Type().toString());
+        return gen;
+    }
+
+    @Test
+    public void validateUDTFlat()
+    {
+        validate(flatUDTGen());
+    }
+
+    @Test
+    public void validateUDTNested()
+    {
+        validate(nestedUDTGen());
+    }
+
     // todo: for completeness, should test invalid two byte pairs.
 }
diff --git a/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java b/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
new file mode 100644
index 0000000..7750f15
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessageSerializationPropertyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataProvider;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CassandraGenerators;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FixedMonotonicClock;
+import org.assertj.core.api.Assertions;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.net.Message.serializer;
+import static org.apache.cassandra.utils.CassandraGenerators.MESSAGE_GEN;
+import static org.apache.cassandra.utils.FailingConsumer.orFail;
+import static org.quicktheories.QuickTheory.qt;
+
+public class MessageSerializationPropertyTest implements Serializable
+{
+    @BeforeClass
+    public static void beforeClass()
+    {
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+        // message serialization uses the MonotonicClock class for precise and approx timestamps, so mock it out
+        System.setProperty("cassandra.monotonic_clock.precise", FixedMonotonicClock.class.getName());
+        System.setProperty("cassandra.monotonic_clock.approx", FixedMonotonicClock.class.getName());
+
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    /**
+     * Validates that {@link Message#serializedSize(int)} == {@link Message.Serializer#serialize(Message, DataOutputPlus, int)} size.
+     */
+    @Test
+    public void serializeSizeProperty()
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer(1024))
+        {
+            qt().forAll(MESSAGE_GEN).checkAssert(orFail(message -> {
+                for (MessagingService.Version version : MessagingService.Version.values())
+                {
+                    out.clear();
+                    serializer.serialize(message, out, version.value);
+                    Assertions.assertThat(out.getLength())
+                              .as("Property serialize(out, version).length == serializedSize(version) " +
+                                  "was violated for version %s and verb %s",
+                                  version, message.header.verb)
+                              .isEqualTo(message.serializedSize(version.value));
+                }
+            }));
+        }
+    }
+
+    /**
+     * Message and payload don't define equals, so have to rely on another way to define equality; serialized bytes!
+     * The assumption is that serialize(deserialize(serialize(message))) == serialize(message)
+     */
+    @Test
+    public void testMessageSerialization() throws Exception
+    {
+        TableMetadataProvider schema = Mockito.mock(TableMetadataProvider.class, Mockito.CALLS_REAL_METHODS);
+        ReadCommand.Serializer readCommandSerializer = new ReadCommand.Serializer(schema);
+        Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> original = Verb.READ_REQ.unsafeSetSerializer(() -> readCommandSerializer);
+        try (DataOutputBuffer first = new DataOutputBuffer(1024);
+             DataOutputBuffer second = new DataOutputBuffer(1024))
+        {
+            qt().forAll(MESSAGE_GEN).checkAssert(orFail(message -> {
+                withTable(schema, message, orFail(ignore -> {
+                    for (MessagingService.Version version : MessagingService.Version.values())
+                    {
+                        first.clear();
+                        second.clear();
+
+                        serializer.serialize(message, first, version.value);
+                        Message<Object> read = serializer.deserialize(new DataInputBuffer(first.buffer(), true), FBUtilities.getBroadcastAddressAndPort(), version.value);
+                        serializer.serialize(read, second, version.value);
+                        // using hex as byte buffer equality kept failing, and was harder to debug difference
+                        // using hex means the specific section of the string that is different will be shown
+                        Assertions.assertThat(ByteBufferUtil.bytesToHex(second.buffer()))
+                                  .as("Property serialize(deserialize(serialize(message))) == serialize(message) "
+                                      + "was violated for version %s and verb %s"
+                                      + "\n first=%s"
+                                      + "\nsecond=%s\n",
+                                      version,
+                                      message.header.verb,
+                                      // toString methods are not relyable for messages, so use reflection to generate one
+                                      new Object() { public String toString() { return CassandraGenerators.toStringRecursive(message); } },
+                                      new Object() { public String toString() { return CassandraGenerators.toStringRecursive(read); } })
+                                  .isEqualTo(ByteBufferUtil.bytesToHex(first.buffer()));
+                    }
+                }));
+            }));
+        }
+        finally
+        {
+            Verb.READ_REQ.unsafeSetSerializer(original);
+        }
+    }
+
+    private static void withTable(TableMetadataProvider schema, Message<?> message, Consumer<TableMetadata> fn)
+    {
+        TableMetadata metadata = null;
+        if (message.payload instanceof ReadQuery)
+            metadata = ((ReadQuery) message.payload).metadata();
+
+        if (metadata != null)
+            Mockito.when(schema.getTableMetadata(metadata.id)).thenReturn(metadata);
+
+        fn.accept(metadata);
+
+        if (metadata != null)
+            Mockito.when(schema.getTableMetadata(metadata.id)).thenReturn(null);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
new file mode 100644
index 0000000..9eb74ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.db.marshal.EmptyType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ShortType;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.quicktheories.core.Gen;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.generators.SourceDSL;
+
+import static org.apache.cassandra.utils.Generators.IDENTIFIER_GEN;
+
+public final class AbstractTypeGenerators
+{
+    private static final Gen<Integer> VERY_SMALL_POSITIVE_SIZE_GEN = SourceDSL.integers().between(1, 3);
+    private static final Gen<Boolean> BOOLEAN_GEN = SourceDSL.booleans().all();
+
+    private static final Map<AbstractType<?>, TypeSupport<?>> PRIMITIVE_TYPE_DATA_GENS =
+    Stream.of(TypeSupport.of(BooleanType.instance, BOOLEAN_GEN),
+              TypeSupport.of(ByteType.instance, SourceDSL.integers().between(0, Byte.MAX_VALUE * 2 + 1).map(Integer::byteValue)),
+              TypeSupport.of(ShortType.instance, SourceDSL.integers().between(0, Short.MAX_VALUE * 2 + 1).map(Integer::shortValue)),
+              TypeSupport.of(Int32Type.instance, SourceDSL.integers().all()),
+              TypeSupport.of(LongType.instance, SourceDSL.longs().all()),
+              TypeSupport.of(FloatType.instance, SourceDSL.floats().any()),
+              TypeSupport.of(DoubleType.instance, SourceDSL.doubles().any()),
+              TypeSupport.of(BytesType.instance, Generators.bytes(0, 1024)),
+              TypeSupport.of(UUIDType.instance, Generators.UUID_RANDOM_GEN),
+              TypeSupport.of(InetAddressType.instance, Generators.INET_ADDRESS_UNRESOLVED_GEN), // serialization strips the hostname, only keeps the address
+              TypeSupport.of(AsciiType.instance, SourceDSL.strings().ascii().ofLengthBetween(0, 1024)),
+              TypeSupport.of(UTF8Type.instance, Generators.utf8(0, 1024)),
+              TypeSupport.of(TimestampType.instance, Generators.DATE_GEN),
+              // null is desired here as #decompose will call org.apache.cassandra.serializers.EmptySerializer.serialize which ignores the input and returns empty bytes
+              TypeSupport.of(EmptyType.instance, rnd -> null)
+              //TODO add the following
+              // IntegerType.instance,
+              // DecimalType.instance,
+              // TimeUUIDType.instance,
+              // LexicalUUIDType.instance,
+              // SimpleDateType.instance,
+              // TimeType.instance,
+              // DurationType.instance,
+    ).collect(Collectors.toMap(t -> t.type, t -> t));
+    // NOTE not supporting reversed as CQL doesn't allow nested reversed types
+    // when generating part of the clustering key, it would be good to allow reversed types as the top level
+    private static final Gen<AbstractType<?>> PRIMITIVE_TYPE_GEN = SourceDSL.arbitrary().pick(new ArrayList<>(PRIMITIVE_TYPE_DATA_GENS.keySet()));
+
+    private AbstractTypeGenerators()
+    {
+
+    }
+
+    public enum TypeKind
+    {PRIMITIVE, SET, LIST, MAP, TUPLE, UDT}
+
+    private static final Gen<TypeKind> TYPE_KIND_GEN = SourceDSL.arbitrary().enumValuesWithNoOrder(TypeKind.class);
+
+    public static Gen<AbstractType<?>> primitiveTypeGen()
+    {
+        return PRIMITIVE_TYPE_GEN;
+    }
+
+    public static Gen<AbstractType<?>> typeGen()
+    {
+        return typeGen(3);
+    }
+
+    public static Gen<AbstractType<?>> typeGen(int maxDepth)
+    {
+        return typeGen(maxDepth, TYPE_KIND_GEN, VERY_SMALL_POSITIVE_SIZE_GEN);
+    }
+
+    public static Gen<AbstractType<?>> typeGen(int maxDepth, Gen<TypeKind> typeKindGen, Gen<Integer> sizeGen)
+    {
+        assert maxDepth >= 0 : "max depth must be positive or zero; given " + maxDepth;
+        boolean atBottom = maxDepth == 0;
+        return rnd -> {
+            // figure out type to get
+            TypeKind kind = typeKindGen.generate(rnd);
+            switch (kind)
+            {
+                case PRIMITIVE:
+                    return PRIMITIVE_TYPE_GEN.generate(rnd);
+                case SET:
+                    return setTypeGen(atBottom ? PRIMITIVE_TYPE_GEN : typeGen(maxDepth - 1, typeKindGen, sizeGen)).generate(rnd);
+                case LIST:
+                    return listTypeGen(atBottom ? PRIMITIVE_TYPE_GEN : typeGen(maxDepth - 1, typeKindGen, sizeGen)).generate(rnd);
+                case MAP:
+                    return mapTypeGen(atBottom ? PRIMITIVE_TYPE_GEN : typeGen(maxDepth - 1, typeKindGen, sizeGen)).generate(rnd);
+                case TUPLE:
+                    return tupleTypeGen(atBottom ? PRIMITIVE_TYPE_GEN : typeGen(maxDepth - 1, typeKindGen, sizeGen), sizeGen).generate(rnd);
+                case UDT:
+                    return userTypeGen(atBottom ? PRIMITIVE_TYPE_GEN : typeGen(maxDepth - 1, typeKindGen, sizeGen), sizeGen).generate(rnd);
+                default:
+                    throw new IllegalArgumentException("Unknown kind: " + kind);
+            }
+        };
+    }
+
+    @SuppressWarnings("unused")
+    public static Gen<SetType<?>> setTypeGen()
+    {
+        return setTypeGen(typeGen(2)); // lower the default depth since this is already a nested type
+    }
+
+    public static Gen<SetType<?>> setTypeGen(Gen<AbstractType<?>> typeGen)
+    {
+        return rnd -> SetType.getInstance(typeGen.generate(rnd), BOOLEAN_GEN.generate(rnd));
+    }
+
+    @SuppressWarnings("unused")
+    public static Gen<ListType<?>> listTypeGen()
+    {
+        return listTypeGen(typeGen(2)); // lower the default depth since this is already a nested type
+    }
+
+    public static Gen<ListType<?>> listTypeGen(Gen<AbstractType<?>> typeGen)
+    {
+        return rnd -> ListType.getInstance(typeGen.generate(rnd), BOOLEAN_GEN.generate(rnd));
+    }
+
+    @SuppressWarnings("unused")
+    public static Gen<MapType<?, ?>> mapTypeGen()
+    {
+        return mapTypeGen(typeGen(2)); // lower the default depth since this is already a nested type
+    }
+
+    public static Gen<MapType<?, ?>> mapTypeGen(Gen<AbstractType<?>> typeGen)
+    {
+        return mapTypeGen(typeGen, typeGen);
+    }
+
+    public static Gen<MapType<?, ?>> mapTypeGen(Gen<AbstractType<?>> keyGen, Gen<AbstractType<?>> valueGen)
+    {
+        return rnd -> MapType.getInstance(keyGen.generate(rnd), valueGen.generate(rnd), BOOLEAN_GEN.generate(rnd));
+    }
+
+    public static Gen<TupleType> tupleTypeGen()
+    {
+        return tupleTypeGen(typeGen(2)); // lower the default depth since this is already a nested type
+    }
+
+    public static Gen<TupleType> tupleTypeGen(Gen<AbstractType<?>> elementGen)
+    {
+        return tupleTypeGen(elementGen, VERY_SMALL_POSITIVE_SIZE_GEN);
+    }
+
+    public static Gen<TupleType> tupleTypeGen(Gen<AbstractType<?>> elementGen, Gen<Integer> sizeGen)
+    {
+        return rnd -> {
+            int numElements = sizeGen.generate(rnd);
+            List<AbstractType<?>> elements = new ArrayList<>(numElements);
+            for (int i = 0; i < numElements; i++)
+                elements.add(elementGen.generate(rnd));
+            return new TupleType(elements);
+        };
+    }
+
+    public static Gen<UserType> userTypeGen()
+    {
+        return userTypeGen(typeGen(2)); // lower the default depth since this is already a nested type
+    }
+
+    public static Gen<UserType> userTypeGen(Gen<AbstractType<?>> elementGen)
+    {
+        return userTypeGen(elementGen, VERY_SMALL_POSITIVE_SIZE_GEN);
+    }
+
+    public static Gen<UserType> userTypeGen(Gen<AbstractType<?>> elementGen, Gen<Integer> sizeGen)
+    {
+        Gen<FieldIdentifier> fieldNameGen = IDENTIFIER_GEN.map(FieldIdentifier::forQuoted);
+        return rnd -> {
+            boolean multiCell = BOOLEAN_GEN.generate(rnd);
+            int numElements = sizeGen.generate(rnd);
+            List<AbstractType<?>> fieldTypes = new ArrayList<>(numElements);
+            LinkedHashSet<FieldIdentifier> fieldNames = new LinkedHashSet<>(numElements);
+            String ks = IDENTIFIER_GEN.generate(rnd);
+            ByteBuffer name = AsciiType.instance.decompose(IDENTIFIER_GEN.generate(rnd));
+
+            Gen<FieldIdentifier> distinctNameGen = Generators.filter(fieldNameGen, 30, e -> !fieldNames.contains(e));
+            // UDTs don't allow duplicate names, so make sure all names are unique
+            for (int i = 0; i < numElements; i++)
+            {
+                fieldTypes.add(elementGen.generate(rnd));
+                fieldNames.add(distinctNameGen.generate(rnd));
+            }
+            return new UserType(ks, name, new ArrayList<>(fieldNames), fieldTypes, multiCell);
+        };
+    }
+
+    public static Gen<AbstractType<?>> allowReversed(Gen<AbstractType<?>> gen)
+    {
+        return rnd -> BOOLEAN_GEN.generate(rnd) ? ReversedType.getInstance(gen.generate(rnd)) : gen.generate(rnd);
+    }
+
+    /**
+     * For a type, create generators for data that matches that type
+     */
+    public static <T> TypeSupport<T> getTypeSupport(AbstractType<T> type)
+    {
+        return getTypeSupport(type, VERY_SMALL_POSITIVE_SIZE_GEN);
+    }
+
+    /**
+     * For a type, create generators for data that matches that type
+     */
+    public static <T> TypeSupport<T> getTypeSupport(AbstractType<T> type, Gen<Integer> sizeGen)
+    {
+        // this doesn't affect the data, only sort order, so drop it
+        if (type.isReversed())
+            type = ((ReversedType<T>) type).baseType;
+        // cast is safe since type is a constant and was type cast while inserting into the map
+        @SuppressWarnings("unchecked")
+        TypeSupport<T> gen = (TypeSupport<T>) PRIMITIVE_TYPE_DATA_GENS.get(type);
+        if (gen != null)
+            return gen;
+        // might be... complex...
+        if (type instanceof SetType)
+        {
+            // T = Set<A> so can not use T here
+            SetType<Object> setType = (SetType<Object>) type;
+            TypeSupport<?> elementSupport = getTypeSupport(setType.getElementsType(), sizeGen);
+            @SuppressWarnings("unchecked")
+            TypeSupport<T> support = (TypeSupport<T>) TypeSupport.of(setType, rnd -> {
+                int size = sizeGen.generate(rnd);
+                HashSet<Object> set = Sets.newHashSetWithExpectedSize(size);
+                for (int i = 0; i < size; i++)
+                    set.add(elementSupport.valueGen.generate(rnd));
+                return set;
+            });
+            return support;
+        }
+        else if (type instanceof ListType)
+        {
+            // T = List<A> so can not use T here
+            ListType<Object> listType = (ListType<Object>) type;
+            TypeSupport<?> elementSupport = getTypeSupport(listType.getElementsType(), sizeGen);
+            @SuppressWarnings("unchecked")
+            TypeSupport<T> support = (TypeSupport<T>) TypeSupport.of(listType, rnd -> {
+                int size = sizeGen.generate(rnd);
+                List<Object> list = new ArrayList<>(size);
+                for (int i = 0; i < size; i++)
+                    list.add(elementSupport.valueGen.generate(rnd));
+                return list;
+            });
+            return support;
+        }
+        else if (type instanceof MapType)
+        {
+            // T = Map<A, B> so can not use T here
+            MapType<Object, Object> mapType = (MapType<Object, Object>) type;
+            TypeSupport<?> keySupport = getTypeSupport(mapType.getKeysType(), sizeGen);
+            TypeSupport<?> valueSupport = getTypeSupport(mapType.getValuesType(), sizeGen);
+            @SuppressWarnings("unchecked")
+            TypeSupport<T> support = (TypeSupport<T>) TypeSupport.of(mapType, rnd -> {
+                int size = sizeGen.generate(rnd);
+                Map<Object, Object> map = Maps.newHashMapWithExpectedSize(size);
+                // if there is conflict thats fine
+                for (int i = 0; i < size; i++)
+                    map.put(keySupport.valueGen.generate(rnd), valueSupport.valueGen.generate(rnd));
+                return map;
+            });
+            return support;
+        }
+        else if (type instanceof TupleType) // includes UserType
+        {
+            // T is ByteBuffer
+            TupleType tupleType = (TupleType) type;
+            @SuppressWarnings("unchecked")
+            TypeSupport<T> support = (TypeSupport<T>) TypeSupport.of(tupleType, new TupleGen(tupleType, sizeGen));
+            return support;
+        }
+        throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+
+    private static final class TupleGen implements Gen<ByteBuffer>
+    {
+        private final List<TypeSupport<Object>> elementsSupport;
+
+        @SuppressWarnings("unchecked")
+        private TupleGen(TupleType tupleType, Gen<Integer> sizeGen)
+        {
+            this.elementsSupport = tupleType.allTypes().stream().map(t -> getTypeSupport((AbstractType<Object>) t, sizeGen)).collect(Collectors.toList());
+        }
+
+        public ByteBuffer generate(RandomnessSource rnd)
+        {
+            List<TypeSupport<Object>> eSupport = this.elementsSupport;
+            ByteBuffer[] elements = new ByteBuffer[eSupport.size()];
+            for (int i = 0; i < eSupport.size(); i++)
+            {
+                TypeSupport<Object> support = eSupport.get(i);
+                elements[i] = support.type.decompose(support.valueGen.generate(rnd));
+            }
+            return TupleType.buildValue(elements);
+        }
+    }
+
+    /**
+     * Pair of {@link AbstractType} and a Generator of values that are handled by that type.
+     */
+    public static final class TypeSupport<T>
+    {
+        public final AbstractType<T> type;
+        public final Gen<T> valueGen;
+
+        private TypeSupport(AbstractType<T> type, Gen<T> valueGen)
+        {
+            this.type = Objects.requireNonNull(type);
+            this.valueGen = Objects.requireNonNull(valueGen);
+        }
+
+        public static <T> TypeSupport<T> of(AbstractType<T> type, Gen<T> valueGen)
+        {
+            return new TypeSupport<>(type, valueGen);
+        }
+
+        /**
+         * Generator which composes the values gen with {@link AbstractType#decompose(Object)}
+         */
+        public Gen<ByteBuffer> bytesGen()
+        {
+            return rnd -> type.decompose(valueGen.generate(rnd));
+        }
+
+        public String toString()
+        {
+            return "TypeSupport{" +
+                   "type=" + type +
+                   '}';
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/ByteArrayUtilTest.java b/test/unit/org/apache/cassandra/utils/ByteArrayUtilTest.java
new file mode 100644
index 0000000..fe965fe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/ByteArrayUtilTest.java
@@ -0,0 +1,211 @@
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.quicktheories.QuickTheory.qt;
+
+public class ByteArrayUtilTest
+{
+    private static final ByteOrder ORDER = ByteOrder.BIG_ENDIAN;
+
+    @Test
+    public void putGetBoolean()
+    {
+        byte[] bytes = new byte[10];
+        for (int i = 0; i < bytes.length; i++)
+        {
+            for (boolean b : Arrays.asList(Boolean.TRUE, Boolean.FALSE))
+            {
+                ByteArrayUtil.putBoolean(bytes, i, b);
+                assertThat(ByteArrayUtil.getBoolean(bytes, i))
+                          .as("get(put(b)) == b")
+                          .isEqualTo(b);
+            }
+        }
+    }
+
+    @Test
+    public void putBooleanArrayTooSmall()
+    {
+        putArrayToSmall(1, bytes -> ByteArrayUtil.putBoolean(bytes, 0, true));
+    }
+
+    @Test
+    public void putBooleanArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putBoolean(bytes, bytes.length + 10, true))
+        .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    @Test
+    public void putGetShort()
+    {
+        Gen<Short> gen = SourceDSL.integers().between(Short.MIN_VALUE, Short.MAX_VALUE).map(Integer::shortValue);
+        byte[] bytes = new byte[Short.BYTES + 1];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ORDER);
+        qt().forAll(gen).checkAssert(jnum -> {
+            short value = jnum.shortValue();
+            ByteArrayUtil.putShort(bytes, 1, value);
+            assertThat(ByteArrayUtil.getShort(bytes, 1))
+                      .as("get(put(b)) == b")
+                      .isEqualTo(value)
+                      .isEqualTo(buffer.getShort(1));
+        });
+    }
+
+    @Test
+    public void putShortArrayTooSmall()
+    {
+        putArrayToSmall(Short.BYTES, bytes -> ByteArrayUtil.putShort(bytes, 0, (short) 42));
+    }
+
+    @Test
+    public void putShortArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putInt(bytes, bytes.length + 10, (short) 42))
+        .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    @Test
+    public void putGetInt()
+    {
+        Gen<Integer> gen = SourceDSL.integers().all();
+        byte[] bytes = new byte[Integer.BYTES + 1];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ORDER);
+        qt().forAll(gen).checkAssert(jnum -> {
+            int value = jnum.intValue();
+            ByteArrayUtil.putInt(bytes, 1, value);
+            assertThat(ByteArrayUtil.getInt(bytes, 1))
+                      .as("get(put(b)) == b")
+                      .isEqualTo(value)
+                      .isEqualTo(buffer.getInt(1));
+        });
+    }
+
+    @Test
+    public void putIntArrayTooSmall()
+    {
+        putArrayToSmall(Integer.BYTES, bytes -> ByteArrayUtil.putInt(bytes, 0, 42));
+    }
+
+    @Test
+    public void putIntArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putInt(bytes, bytes.length + 10, 42))
+        .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    @Test
+    public void putGetLong()
+    {
+        Gen<Long> gen = SourceDSL.longs().all();
+        byte[] bytes = new byte[Long.BYTES + 1];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ORDER);
+        qt().forAll(gen).checkAssert(jnum -> {
+            long value = jnum.longValue();
+            ByteArrayUtil.putLong(bytes, 1, value);
+            assertThat(ByteArrayUtil.getLong(bytes, 1))
+                      .as("get(put(b)) == b")
+                      .isEqualTo(value)
+                      .isEqualTo(buffer.getLong(1));
+        });
+    }
+
+    @Test
+    public void putLongArrayTooSmall()
+    {
+        putArrayToSmall(Long.BYTES, bytes -> ByteArrayUtil.putLong(bytes, 0, 42L));
+    }
+
+    @Test
+    public void putLongArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putLong(bytes, bytes.length + 10, 42))
+        .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    @Test
+    public void putGetFloat()
+    {
+        Gen<Float> gen = SourceDSL.floats().any();
+        byte[] bytes = new byte[Float.BYTES + 1];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ORDER);
+        qt().forAll(gen).checkAssert(jnum -> {
+            float value = jnum.floatValue();
+            ByteArrayUtil.putFloat(bytes, 1, value);
+            assertThat(ByteArrayUtil.getFloat(bytes, 1))
+                      .as("get(put(b)) == b")
+                      .isEqualTo(value)
+                      .isEqualTo(buffer.getFloat(1));
+        });
+    }
+
+    @Test
+    public void putFloatArrayTooSmall()
+    {
+        putArrayToSmall(Float.BYTES, bytes -> ByteArrayUtil.putFloat(bytes, 0, 42f));
+    }
+
+    @Test
+    public void putFloatArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putFloat(bytes, bytes.length + 10, 42.0f))
+        .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    @Test
+    public void putGetDouble()
+    {
+        Gen<Double> gen = SourceDSL.doubles().any();
+        byte[] bytes = new byte[Double.BYTES + 1];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ORDER);
+        qt().forAll(gen).checkAssert(jnum -> {
+            double value = jnum.doubleValue();
+            ByteArrayUtil.putDouble(bytes, 1, value);
+            assertThat(ByteArrayUtil.getDouble(bytes, 1))
+                      .as("get(put(b)) == b")
+                      .isEqualTo(value)
+                      .isEqualTo(buffer.getDouble(1));
+        });
+    }
+
+    @Test
+    public void putDoubleArrayTooSmall()
+    {
+        putArrayToSmall(Double.BYTES, bytes -> ByteArrayUtil.putDouble(bytes, 0, 42.0));
+    }
+
+    @Test
+    public void putDoubleArrayOutOfBounds()
+    {
+        byte[] bytes = new byte[16];
+        assertThatThrownBy(() -> ByteArrayUtil.putDouble(bytes, bytes.length + 10, 42.0))
+            .hasMessageContaining("Attempted to write to offset 26 but array length is 16");
+    }
+
+    private static void putArrayToSmall(int targetBytes, FailingConsumer<byte[]> fn)
+    {
+        for (int i = 0; i < targetBytes - 1; i++)
+        {
+            byte[] bytes = new byte[i];
+            assertThatThrownBy(() -> fn.doAccept(bytes)).isInstanceOf(IndexOutOfBoundsException.class)
+                                                        .hasMessageContaining("Attempted to write " + targetBytes + " bytes")
+                                                        .hasMessageContaining("remaining capacity of " + i);
+            assertThat(bytes).isEqualTo(new byte[i]);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
new file mode 100644
index 0000000..575dcf2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SchemaCQLHelper;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.EmptyType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ConnectionType;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.PingRequest;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableParams;
+import org.quicktheories.core.Gen;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.generators.Generate;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.utils.AbstractTypeGenerators.allowReversed;
+import static org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
+import static org.apache.cassandra.utils.Generators.IDENTIFIER_GEN;
+
+public final class CassandraGenerators
+{
+    private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n", Pattern.LITERAL);
+
+    // utility generators for creating more complex types
+    private static final Gen<Integer> SMALL_POSITIVE_SIZE_GEN = SourceDSL.integers().between(1, 30);
+    private static final Gen<Boolean> BOOLEAN_GEN = SourceDSL.booleans().all();
+
+    private static final Gen<IPartitioner> PARTITIONER_GEN = SourceDSL.arbitrary().pick(Murmur3Partitioner.instance,
+                                                                                        ByteOrderedPartitioner.instance,
+                                                                                        new LocalPartitioner(TimeUUIDType.instance),
+                                                                                        OrderPreservingPartitioner.instance,
+                                                                                        RandomPartitioner.instance);
+
+
+    private static final Gen<TableMetadata.Kind> TABLE_KIND_GEN = SourceDSL.arbitrary().pick(TableMetadata.Kind.REGULAR, TableMetadata.Kind.INDEX, TableMetadata.Kind.VIRTUAL);
+    public static final Gen<TableMetadata> TABLE_METADATA_GEN = gen(rnd -> createTableMetadata(IDENTIFIER_GEN.generate(rnd), rnd)).describedAs(CassandraGenerators::toStringRecursive);
+
+    private static final Gen<SinglePartitionReadCommand> SINGLE_PARTITION_READ_COMMAND_GEN = gen(rnd -> {
+        TableMetadata metadata = TABLE_METADATA_GEN.generate(rnd);
+        int nowInSec = (int) rnd.next(Constraint.between(1, Integer.MAX_VALUE));
+        ByteBuffer key = partitionKeyDataGen(metadata).generate(rnd);
+        //TODO support all fields of SinglePartitionReadCommand
+        return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+    }).describedAs(CassandraGenerators::toStringRecursive);
+    private static final Gen<? extends ReadCommand> READ_COMMAND_GEN = Generate.oneOf(SINGLE_PARTITION_READ_COMMAND_GEN)
+                                                                               .describedAs(CassandraGenerators::toStringRecursive);
+
+    // Outbound messages
+    private static final Gen<ConnectionType> CONNECTION_TYPE_GEN = SourceDSL.arbitrary().pick(ConnectionType.URGENT_MESSAGES, ConnectionType.SMALL_MESSAGES, ConnectionType.LARGE_MESSAGES);
+    public static final Gen<Message<PingRequest>> MESSAGE_PING_GEN = CONNECTION_TYPE_GEN
+                                                                     .map(t -> Message.builder(Verb.PING_REQ, PingRequest.get(t)).build())
+                                                                     .describedAs(CassandraGenerators::toStringRecursive);
+    public static final Gen<Message<? extends ReadCommand>> MESSAGE_READ_COMMAND_GEN = READ_COMMAND_GEN
+                                                                                       .<Message<? extends ReadCommand>>map(c -> Message.builder(Verb.READ_REQ, c).build())
+                                                                                       .describedAs(CassandraGenerators::toStringRecursive);
+
+    public static final Gen<Message<?>> MESSAGE_GEN = Generate.oneOf(cast(MESSAGE_PING_GEN),
+                                                                     cast(MESSAGE_READ_COMMAND_GEN))
+                                                              .describedAs(CassandraGenerators::toStringRecursive);
+
+    private CassandraGenerators()
+    {
+
+    }
+
+    private static TableMetadata createTableMetadata(String ks, RandomnessSource rnd)
+    {
+        String tableName = IDENTIFIER_GEN.generate(rnd);
+        TableMetadata.Builder builder = TableMetadata.builder(ks, tableName, TableId.fromUUID(Generators.UUID_RANDOM_GEN.generate(rnd)))
+                                                     .partitioner(PARTITIONER_GEN.generate(rnd))
+                                                     .kind(TABLE_KIND_GEN.generate(rnd))
+                                                     .isCounter(BOOLEAN_GEN.generate(rnd))
+                                                     .params(TableParams.builder().build());
+
+        // generate columns
+        // must have a non-zero amount of partition columns, but may have 0 for the rest; SMALL_POSSITIVE_SIZE_GEN won't return 0
+        int numPartitionColumns = SMALL_POSITIVE_SIZE_GEN.generate(rnd);
+        int numClusteringColumns = SMALL_POSITIVE_SIZE_GEN.generate(rnd) - 1;
+        int numRegularColumns = SMALL_POSITIVE_SIZE_GEN.generate(rnd) - 1;
+        int numStaticColumns = SMALL_POSITIVE_SIZE_GEN.generate(rnd) - 1;
+
+        Set<String> createdColumnNames = new HashSet<>();
+        for (int i = 0; i < numPartitionColumns; i++)
+            builder.addColumn(createColumnDefinition(ks, tableName, ColumnMetadata.Kind.PARTITION_KEY, createdColumnNames, rnd));
+        for (int i = 0; i < numClusteringColumns; i++)
+            builder.addColumn(createColumnDefinition(ks, tableName, ColumnMetadata.Kind.CLUSTERING, createdColumnNames, rnd));
+        for (int i = 0; i < numStaticColumns; i++)
+            builder.addColumn(createColumnDefinition(ks, tableName, ColumnMetadata.Kind.STATIC, createdColumnNames, rnd));
+        for (int i = 0; i < numRegularColumns; i++)
+            builder.addColumn(createColumnDefinition(ks, tableName, ColumnMetadata.Kind.REGULAR, createdColumnNames, rnd));
+
+        return builder.build();
+    }
+
+    private static ColumnMetadata createColumnDefinition(String ks, String table,
+                                                         ColumnMetadata.Kind kind,
+                                                         Set<String> createdColumnNames, /* This is mutated to check for collisions, so has a side effect outside of normal random generation */
+                                                         RandomnessSource rnd)
+    {
+        Gen<AbstractType<?>> typeGen = AbstractTypeGenerators.typeGen();
+        switch (kind)
+        {
+            // partition and clustering keys require frozen types, so make sure all types generated will be frozen
+            // empty type is also not supported, so filter out
+            case PARTITION_KEY:
+            case CLUSTERING:
+                typeGen = Generators.filter(typeGen, t -> t != EmptyType.instance).map(AbstractType::freeze);
+                break;
+        }
+        if (kind == ColumnMetadata.Kind.CLUSTERING)
+        {
+            // when working on a clustering column, add in reversed types periodically
+            typeGen = allowReversed(typeGen);
+        }
+        // filter for unique names
+        String str;
+        while (!createdColumnNames.add(str = IDENTIFIER_GEN.generate(rnd)))
+        {
+        }
+        ColumnIdentifier name = new ColumnIdentifier(str, true);
+        int position = !kind.isPrimaryKeyKind() ? -1 : (int) rnd.next(Constraint.between(0, 30));
+        return new ColumnMetadata(ks, table, name, typeGen.generate(rnd), position, kind);
+    }
+
+    public static Gen<ByteBuffer> partitionKeyDataGen(TableMetadata metadata)
+    {
+        ImmutableList<ColumnMetadata> columns = metadata.partitionKeyColumns();
+        assert !columns.isEmpty() : "Unable to find partition key columns";
+        if (columns.size() == 1)
+            return getTypeSupport(columns.get(0).type).bytesGen();
+        List<Gen<ByteBuffer>> columnGens = new ArrayList<>(columns.size());
+        for (ColumnMetadata cm : columns)
+            columnGens.add(getTypeSupport(cm.type).bytesGen());
+        return rnd -> {
+            ByteBuffer[] buffers = new ByteBuffer[columnGens.size()];
+            for (int i = 0; i < columnGens.size(); i++)
+                buffers[i] = columnGens.get(i).generate(rnd);
+            return CompositeType.build(buffers);
+        };
+    }
+
+    /**
+     * Hacky workaround to make sure different generic MessageOut types can be used for {@link #MESSAGE_GEN}.
+     */
+    private static Gen<Message<?>> cast(Gen<? extends Message<?>> gen)
+    {
+        return (Gen<Message<?>>) gen;
+    }
+
+    /**
+     * Java's type inferrence with chaining doesn't work well, so this is used to infer the root type early in cases
+     * where javac can't figure it out
+     */
+    private static <T> Gen<T> gen(Gen<T> fn)
+    {
+        return fn;
+    }
+
+    /**
+     * Uses reflection to generate a toString.  This method is aware of common Cassandra classes and can be used for
+     * generators or tests to provide more details for debugging.
+     */
+    public static String toStringRecursive(Object o)
+    {
+        return ReflectionToStringBuilder.toString(o, new MultilineRecursiveToStringStyle()
+        {
+            private String spacer = "";
+
+            {
+                // common lang uses start/end chars that are not the common ones used, so switch to the common ones
+                setArrayStart("[");
+                setArrayEnd("]");
+                setContentStart("{");
+                setContentEnd("}");
+                setUseIdentityHashCode(false);
+                setUseShortClassName(true);
+            }
+
+            protected boolean accept(Class<?> clazz)
+            {
+                return !clazz.isEnum() // toString enums
+                       && Stream.of(clazz.getDeclaredFields()).anyMatch(f -> !Modifier.isStatic(f.getModifiers())); // if no fields, just toString
+            }
+
+            public void appendDetail(StringBuffer buffer, String fieldName, Object value)
+            {
+                if (value instanceof ByteBuffer)
+                {
+                    value = ByteBufferUtil.bytesToHex((ByteBuffer) value);
+                }
+                else if (value instanceof AbstractType)
+                {
+                    value = SchemaCQLHelper.toCqlType((AbstractType) value);
+                }
+                else if (value instanceof Token || value instanceof InetAddressAndPort || value instanceof FieldIdentifier)
+                {
+                    value = value.toString();
+                }
+                else if (value instanceof TableMetadata)
+                {
+                    // to make sure the correct indents are taken, convert to CQL, then replace newlines with the indents
+                    // then prefix with the indents.
+                    String cql = SchemaCQLHelper.getTableMetadataAsCQL((TableMetadata) value, true, true, false);
+                    cql = NEWLINE_PATTERN.matcher(cql).replaceAll(Matcher.quoteReplacement("\n  " + spacer));
+                    cql = "\n  " + spacer + cql;
+                    value = cql;
+                }
+                super.appendDetail(buffer, fieldName, value);
+            }
+
+            // MultilineRecursiveToStringStyle doesn't look at what was set and instead hard codes the values when it "resets" the level
+            protected void setArrayStart(String arrayStart)
+            {
+                super.setArrayStart(arrayStart.replace("{", "["));
+            }
+
+            protected void setArrayEnd(String arrayEnd)
+            {
+                super.setArrayEnd(arrayEnd.replace("}", "]"));
+            }
+
+            protected void setContentStart(String contentStart)
+            {
+                // use this to infer the spacer since it isn't exposed.
+                String[] split = contentStart.split("\n", 2);
+                spacer = split.length == 2 ? split[1] : "";
+                super.setContentStart(contentStart.replace("[", "{"));
+            }
+
+            protected void setContentEnd(String contentEnd)
+            {
+                super.setContentEnd(contentEnd.replace("]", "}"));
+            }
+        }, true);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java b/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
new file mode 100644
index 0000000..7753321
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/FixedMonotonicClock.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixedMonotonicClock implements MonotonicClock
+{
+    public long now()
+    {
+        return 42;
+    }
+
+    public long error()
+    {
+        return 0;
+    }
+
+    public MonotonicClockTranslation translate()
+    {
+        return FakeMonotonicClockTranslation.instance;
+    }
+
+    public boolean isAfter(long instant)
+    {
+        return false;
+    }
+
+    public boolean isAfter(long now, long instant)
+    {
+        return false;
+    }
+
+    private static final class FakeMonotonicClockTranslation implements MonotonicClockTranslation
+    {
+        private static final FakeMonotonicClockTranslation instance = new FakeMonotonicClockTranslation();
+
+        public long fromMillisSinceEpoch(long currentTimeMillis)
+        {
+            return TimeUnit.MILLISECONDS.toNanos(currentTimeMillis);
+        }
+
+        public long toMillisSinceEpoch(long nanoTime)
+        {
+            return TimeUnit.NANOSECONDS.toMillis(nanoTime);
+        }
+
+        public long error()
+        {
+            return 0;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/Generators.java b/test/unit/org/apache/cassandra/utils/Generators.java
index 18df830..43d4335 100644
--- a/test/unit/org/apache/cassandra/utils/Generators.java
+++ b/test/unit/org/apache/cassandra/utils/Generators.java
@@ -1,17 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.cassandra.utils;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.quicktheories.core.Gen;
+import org.quicktheories.core.RandomnessSource;
+import org.quicktheories.generators.SourceDSL;
 import org.quicktheories.impl.Constraint;
 
 public final class Generators
 {
+    private static final Logger logger = LoggerFactory.getLogger(Generators.class);
+
+    private static final Constraint INT_CONSTRAINT = Constraint.between(Integer.MIN_VALUE, Integer.MAX_VALUE);
+    private static final Constraint LONG_CONSTRAINT = Constraint.between(Long.MIN_VALUE, Long.MAX_VALUE);
+
+    private static final int MAX_BLOB_LENGTH = 1 * 1024 * 1024;
+
+    private static final Constraint DNS_DOMAIN_PARTS_CONSTRAINT = Constraint.between(1, 127);
+
+    private static final char[] LETTER_DOMAIN = createLetterDomain();
+    private static final Constraint LETTER_CONSTRAINT = Constraint.between(0, LETTER_DOMAIN.length - 1).withNoShrinkPoint();
+    private static final char[] LETTER_OR_DIGIT_DOMAIN = createLetterOrDigitDomain();
+    private static final Constraint LETTER_OR_DIGIT_CONSTRAINT = Constraint.between(0, LETTER_OR_DIGIT_DOMAIN.length - 1).withNoShrinkPoint();
     private static final char[] REGEX_WORD_DOMAIN = createRegexWordDomain();
+    private static final Constraint REGEX_WORD_CONSTRAINT = Constraint.between(0, REGEX_WORD_DOMAIN.length - 1).withNoShrinkPoint();
+    private static final char[] DNS_DOMAIN_PART_DOMAIN = createDNSDomainPartDomain();
+    private static final Constraint DNS_DOMAIN_PART_CONSTRAINT = Constraint.between(0, DNS_DOMAIN_PART_DOMAIN.length - 1).withNoShrinkPoint();
+
+    public static final Gen<String> IDENTIFIER_GEN = Generators.regexWord(SourceDSL.integers().between(1, 50));
+
+    public static final Gen<UUID> UUID_RANDOM_GEN = rnd -> {
+        long most = rnd.next(Constraint.none());
+        most &= 0x0f << 8; /* clear version        */
+        most += 0x40 << 8; /* set to version 4     */
+        long least = rnd.next(Constraint.none());
+        least &= 0x3fl << 56; /* clear variant        */
+        least |= 0x80l << 56; /* set to IETF variant  */
+        return new UUID(most, least);
+    };
+
+    public static final Gen<String> DNS_DOMAIN_NAME = rnd -> {
+        // how many parts to generate
+        int numParts = (int) rnd.next(DNS_DOMAIN_PARTS_CONSTRAINT);
+        int MAX_LENGTH = 253;
+        int MAX_PART_LENGTH = 63;
+        // to make sure the string is within the max allowed length (253), cap each part uniformily
+        Constraint partSizeConstraint = Constraint.between(1, Math.min(Math.max(1, (int) Math.ceil((MAX_LENGTH - numParts) / numParts)), MAX_PART_LENGTH));
+        StringBuilder sb = new StringBuilder(MAX_LENGTH);
+        for (int i = 0; i < numParts; i++)
+        {
+            int partSize = (int) rnd.next(partSizeConstraint);
+            // -_ not allowed in the first or last position of a part, so special case these
+            // also, only use letters as first char doesn't allow digits uniformailly
+            sb.append(LETTER_DOMAIN[(int) rnd.next(LETTER_CONSTRAINT)]);
+            for (int j = 1; j < partSize; j++)
+                sb.append(DNS_DOMAIN_PART_DOMAIN[(int) rnd.next(DNS_DOMAIN_PART_CONSTRAINT)]);
+            if (isDash(sb.charAt(sb.length() - 1)))
+            {
+                // need to replace
+                sb.setCharAt(sb.length() - 1, LETTER_OR_DIGIT_DOMAIN[(int) rnd.next(LETTER_OR_DIGIT_CONSTRAINT)]);
+            }
+            sb.append('.'); // domain allows . at the end (part of spec) so don't need to worry about removing
+        }
+        return sb.toString();
+    };
+
+    private static final class Ipv4AddressGen implements Gen<byte[]>
+    {
+        public byte[] generate(RandomnessSource rnd)
+        {
+            byte[] bytes = new byte[4];
+            ByteArrayUtil.putInt(bytes, 0, (int) rnd.next(INT_CONSTRAINT));
+            return bytes;
+        }
+    }
+    public static final Gen<byte[]> IPV4_ADDRESS = new Ipv4AddressGen();
+    private static final class Ipv6AddressGen implements Gen<byte[]>
+    {
+        public byte[] generate(RandomnessSource rnd)
+        {
+            byte[] bytes = new byte[16];
+            ByteArrayUtil.putLong(bytes, 0, rnd.next(LONG_CONSTRAINT));
+            ByteArrayUtil.putLong(bytes, 8, rnd.next(LONG_CONSTRAINT));
+            return bytes;
+        }
+    }
+    public static final Gen<byte[]> IPV6_ADDRESS = new Ipv6AddressGen();
+
+    public static final Gen<InetAddress> INET_4_ADDRESS_RESOLVED_GEN = rnd -> {
+        try
+        {
+            return InetAddress.getByAddress(DNS_DOMAIN_NAME.generate(rnd), IPV4_ADDRESS.generate(rnd));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    };
+
+    public static final Gen<InetAddress> INET_4_ADDRESS_UNRESOLVED_GEN = rnd -> {
+        try
+        {
+            return InetAddress.getByAddress(null, IPV4_ADDRESS.generate(rnd));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    };
+    public static final Gen<InetAddress> INET_4_ADDRESS_GEN = INET_4_ADDRESS_RESOLVED_GEN.mix(INET_4_ADDRESS_UNRESOLVED_GEN);
+
+    public static final Gen<InetAddress> INET_6_ADDRESS_RESOLVED_GEN = rnd -> {
+        try
+        {
+            return InetAddress.getByAddress(DNS_DOMAIN_NAME.generate(rnd), IPV6_ADDRESS.generate(rnd));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    };
+
+    public static final Gen<InetAddress> INET_6_ADDRESS_UNRESOLVED_GEN = rnd -> {
+        try
+        {
+            return InetAddress.getByAddress(null, IPV6_ADDRESS.generate(rnd));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    };
+    public static final Gen<InetAddress> INET_6_ADDRESS_GEN = INET_6_ADDRESS_RESOLVED_GEN.mix(INET_6_ADDRESS_UNRESOLVED_GEN);
+    public static final Gen<InetAddress> INET_ADDRESS_GEN = INET_4_ADDRESS_GEN.mix(INET_6_ADDRESS_GEN);
+    public static final Gen<InetAddress> INET_ADDRESS_UNRESOLVED_GEN = INET_4_ADDRESS_UNRESOLVED_GEN.mix(INET_6_ADDRESS_UNRESOLVED_GEN);
+
+    /**
+     * Implements a valid utf-8 generator.
+     *
+     * Implementation note, currently relies on getBytes to strip out non-valid utf-8 chars, so is slow
+     */
+    public static final Gen<String> UTF_8_GEN = utf8(0, 1024);
+
+    // time generators
+    // all time is boxed in the future around 50 years from today: Aug 20th, 2020 UTC
+    public static final Gen<Timestamp> TIMESTAMP_GEN;
+    public static final Gen<Date> DATE_GEN;
+
+    static
+    {
+        ZonedDateTime now = ZonedDateTime.of(2020, 8, 20,
+                                             0, 0, 0, 0, ZoneOffset.UTC);
+        ZonedDateTime startOfTime = now.minusYears(50);
+        ZonedDateTime endOfDays = now.plusYears(50);
+        Constraint millisConstraint = Constraint.between(startOfTime.toInstant().toEpochMilli(), endOfDays.toInstant().toEpochMilli());
+        Constraint nanosInSecondConstraint = Constraint.between(0, 999999999);
+        TIMESTAMP_GEN = rnd -> {
+            Timestamp ts = new Timestamp(rnd.next(millisConstraint));
+            ts.setNanos((int) rnd.next(nanosInSecondConstraint));
+            return ts;
+        };
+        DATE_GEN = TIMESTAMP_GEN.map(t -> new Date(t.getTime()));
+    }
 
     private Generators()
     {
 
     }
 
+    /**
+     * Generates values which match the {@link Predicate}.  The main difference with {@link Gen#assuming(Predicate)}
+     * is that this does not stop if not enough matches are found.
+     */
+    public static <T> Gen<T> filter(Gen<T> gen, Predicate<T> fn) {
+        return new FilterGen(gen, fn);
+    }
+
+    /**
+     * Generates values which match the {@link Predicate}.  The main difference with {@link Gen#assuming(Predicate)}
+     * is that failing is controlled at the generator level.
+     */
+    public static <T> Gen<T> filter(Gen<T> gen, int maxAttempts, Predicate<T> fn) {
+        return new BoundedFilterGen<>(gen, maxAttempts, fn);
+    }
+
     public static Gen<String> regexWord(Gen<Integer> sizes)
     {
         return string(sizes, REGEX_WORD_DOMAIN);
@@ -29,7 +232,8 @@ public final class Generators
         Gen<char[]> gen = td -> {
             int size = sizes.generate(td);
             char[] is = new char[size];
-            for (int i = 0; i != size; i++) {
+            for (int i = 0; i != size; i++)
+            {
                 int idx = (int) td.next(constraints);
                 is[i] = domain[idx];
             }
@@ -39,14 +243,27 @@ public final class Generators
         return gen;
     }
 
-    private static char[] createRegexWordDomain()
+    private static char[] createLetterDomain()
+    {
+        // [a-zA-Z]
+        char[] domain = new char[26 * 2];
+
+        int offset = 0;
+        // A-Z
+        for (int c = 65; c < 91; c++)
+            domain[offset++] = (char) c;
+        // a-z
+        for (int c = 97; c < 123; c++)
+            domain[offset++] = (char) c;
+        return domain;
+    }
+
+    private static char[] createLetterOrDigitDomain()
     {
-        // \w == [a-zA-Z_0-9]
-        char[] domain = new char[26 * 2 + 10 + 1];
+        // [a-zA-Z0-9]
+        char[] domain = new char[26 * 2 + 10];
 
         int offset = 0;
-        // _
-        domain[offset++] = (char) 95;
         // 0-9
         for (int c = 48; c < 58; c++)
             domain[offset++] = (char) c;
@@ -58,4 +275,129 @@ public final class Generators
             domain[offset++] = (char) c;
         return domain;
     }
+
+    private static char[] createRegexWordDomain()
+    {
+        // \w == [a-zA-Z_0-9] the only difference with letterOrDigit is the addition of _
+        return ArrayUtils.add(createLetterOrDigitDomain(), (char) 95); // 95 is _
+    }
+
+    private static char[] createDNSDomainPartDomain()
+    {
+        // [a-zA-Z0-9_-] the only difference with regex word is the addition of -
+        return ArrayUtils.add(createRegexWordDomain(), (char) 45); // 45 is -
+    }
+
+    public static Gen<ByteBuffer> bytes(int min, int max)
+    {
+        if (min < 0)
+            throw new IllegalArgumentException("Asked for negative bytes; given " + min);
+        if (max > MAX_BLOB_LENGTH)
+            throw new IllegalArgumentException("Requested bytes larger than shared bytes allowed; " +
+                                               "asked for " + max + " but only have " + MAX_BLOB_LENGTH);
+        if (max < min)
+            throw new IllegalArgumentException("Max was less than min; given min=" + min + " and max=" + max);
+        Constraint sizeConstraint = Constraint.between(min, max);
+        return rnd -> {
+            // since Constraint is immutable and the max was checked, its already proven to be int
+            int size = (int) rnd.next(sizeConstraint);
+            // to add more randomness, also shift offset in the array so the same size doesn't yield the same bytes
+            int offset = (int) rnd.next(Constraint.between(0, MAX_BLOB_LENGTH - size));
+
+            return ByteBuffer.wrap(LazySharedBlob.SHARED_BYTES, offset, size);
+        };
+    }
+
+    /**
+     * Implements a valid utf-8 generator.
+     *
+     * Implementation note, currently relies on getBytes to strip out non-valid utf-8 chars, so is slow
+     */
+    public static Gen<String> utf8(int min, int max)
+    {
+        return SourceDSL.strings()
+                 .basicMultilingualPlaneAlphabet()
+                 .ofLengthBetween(min, max)
+                 .map(s -> new String(s.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8));
+    }
+
+    private static boolean isDash(char c)
+    {
+        switch (c)
+        {
+            case 45: // -
+            case 95: // _
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private static final class LazySharedBlob
+    {
+        private static final byte[] SHARED_BYTES;
+
+        static
+        {
+            long blobSeed = Long.parseLong(System.getProperty("cassandra.test.blob.shared.seed", Long.toString(System.currentTimeMillis())));
+            logger.info("Shared blob Gen used seed {}", blobSeed);
+
+            Random random = new Random(blobSeed);
+            byte[] bytes = new byte[MAX_BLOB_LENGTH];
+            random.nextBytes(bytes);
+
+            SHARED_BYTES = bytes;
+        }
+    }
+
+    private static final class FilterGen<T> implements Gen<T>
+    {
+        private final Gen<T> gen;
+        private final Predicate<T> fn;
+
+        private FilterGen(Gen<T> gen, Predicate<T> fn)
+        {
+            this.gen = gen;
+            this.fn = fn;
+        }
+
+        public T generate(RandomnessSource rs)
+        {
+            while (true)
+            {
+                T value = gen.generate(rs);
+                if (fn.test(value))
+                {
+                    return value;
+                }
+            }
+        }
+    }
+
+    private static final class BoundedFilterGen<T> implements Gen<T>
+    {
+        private final Gen<T> gen;
+        private final int maxAttempts;
+        private final Predicate<T> fn;
+
+        private BoundedFilterGen(Gen<T> gen, int maxAttempts, Predicate<T> fn)
+        {
+            this.gen = gen;
+            this.maxAttempts = maxAttempts;
+            this.fn = fn;
+        }
+
+        public T generate(RandomnessSource rs)
+        {
+            for (int i = 0; i < maxAttempts; i++)
+            {
+                T value = gen.generate(rs);
+                if (fn.test(value))
+                {
+                    return value;
+                }
+            }
+            throw new IllegalStateException("Gave up trying to find values matching assumptions after " + maxAttempts + " attempts");
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/utils/GeneratorsTest.java b/test/unit/org/apache/cassandra/utils/GeneratorsTest.java
new file mode 100644
index 0000000..b91421c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/GeneratorsTest.java
@@ -0,0 +1,30 @@
+package org.apache.cassandra.utils;
+
+import com.google.common.net.InternetDomainName;
+import org.junit.Test;
+
+import org.assertj.core.api.Assertions;
+
+import static org.quicktheories.QuickTheory.qt;
+
+public class GeneratorsTest
+{
+    @Test
+    public void randomUUID()
+    {
+        qt().forAll(Generators.UUID_RANDOM_GEN).checkAssert(uuid -> {
+            Assertions.assertThat(uuid.version())
+                      .as("version was not random uuid")
+                      .isEqualTo(4);
+            Assertions.assertThat(uuid.variant())
+                      .as("varient not set to IETF (2)")
+                      .isEqualTo(2);
+        });
+    }
+
+    @Test
+    public void dnsDomainName()
+    {
+        qt().forAll(Generators.DNS_DOMAIN_NAME).checkAssert(InternetDomainName::from);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org