You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2017/11/21 12:15:30 UTC

cassandra git commit: Make LWTs send resultset metadata on every request

Repository: cassandra
Updated Branches:
  refs/heads/trunk 5792b667e -> 7eb915097


Make LWTs send resultset metadata on every request

Patch by Alex Petrov; reviewed by Robert Stupp for CASSANDRA-13992

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7eb91509
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7eb91509
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7eb91509

Branch: refs/heads/trunk
Commit: 7eb915097dc3e34e1bb4ef96e6bd8eb67d574622
Parents: 5792b66
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Nov 13 08:56:14 2017 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Nov 21 13:14:51 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 ...iver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar | Bin 0 -> 2618371 bytes
 ...e-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar | Bin 2613656 -> 0 bytes
 .../org/apache/cassandra/cql3/CQLStatement.java |   8 +
 .../org/apache/cassandra/cql3/ResultSet.java    |  20 ++
 .../cql3/statements/BatchStatement.java         |   5 +
 .../cql3/statements/ModificationStatement.java  |   7 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/ExecuteMessage.java      |  21 +-
 .../transport/messages/ResultMessage.java       |   8 +
 .../org/apache/cassandra/cql3/CQLTester.java    |   6 +
 .../cassandra/cql3/PreparedStatementsTest.java  | 207 ++++++++++++++++++-
 12 files changed, 276 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03f5de8..6da2712 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
  * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
  * Introduce leaf-only iterator (CASSANDRA-9988)
  * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..a7be9cb
Binary files /dev/null and b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
deleted file mode 100644
index d95a811..0000000
Binary files a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 901ecd4..f3b2090 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -67,4 +67,12 @@ public interface CQLStatement
      * @return functions all functions found (may contain duplicates)
      */
     public Iterable<Function> getFunctions();
+
+    /**
+     * Whether or not this CQL Statement has LWT conditions
+     */
+    default public boolean hasConditions()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 7dd51fd..9d79dea 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -28,6 +28,8 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.SelectStatement;
@@ -249,6 +251,24 @@ public class ResultSet
             return names == null ? columnCount : names.size();
         }
 
+        @VisibleForTesting
+        public EnumSet<Flag> getFlags()
+        {
+            return flags;
+        }
+
+        @VisibleForTesting
+        public int getColumnCount()
+        {
+            return columnCount;
+        }
+
+        @VisibleForTesting
+        public PagingState getPagingState()
+        {
+            return pagingState;
+        }
+
         /**
          * Adds the specified columns which will not be serialized.
          *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 87204f8..7497f47 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -476,6 +476,11 @@ public class BatchStatement implements CQLStatement
         return Pair.create(casRequest, columnsWithConditions);
     }
 
+    public boolean hasConditions()
+    {
+        return hasConditions;
+    }
+
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         if (hasConditions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4191285..decb99f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -66,6 +67,8 @@ public abstract class ModificationStatement implements CQLStatement
 {
     protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
 
+    private final static MD5Digest EMPTY_HASH = MD5Digest.wrap(new byte[] {});
+
     public static final String CUSTOM_EXPRESSIONS_NOT_ALLOWED =
         "Custom index expressions cannot be used in WHERE clauses for UPDATE or DELETE statements";
 
@@ -502,7 +505,7 @@ public abstract class ModificationStatement implements CQLStatement
         List<ColumnSpecification> specs = new ArrayList<>();
         specs.add(casResultColumnSpecification(ksName, cfName));
 
-        return new ResultSet.ResultMetadata(specs);
+        return new ResultSet.ResultMetadata(EMPTY_HASH, specs);
     }
 
     private static ColumnSpecification casResultColumnSpecification(String ksName, String cfName)
@@ -547,7 +550,7 @@ public abstract class ModificationStatement implements CQLStatement
             row.addAll(right.rows.get(i));
             rows.add(row);
         }
-        return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
+        return new ResultSet(new ResultSet.ResultMetadata(EMPTY_HASH, specs), rows);
     }
 
     private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions options)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 9c1fb07..07463e2 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -119,7 +119,7 @@ public class SimpleClient implements Closeable
         this(host, port, new EncryptionOptions());
     }
 
-    public void connect(boolean useCompression) throws IOException
+    public SimpleClient connect(boolean useCompression) throws IOException
     {
         establishConnection();
 
@@ -131,6 +131,8 @@ public class SimpleClient implements Closeable
             connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
         }
         execute(new StartupMessage(options));
+
+        return this;
     }
 
     public void setEventHandler(EventHandler eventHandler)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index a8fd2a0..0b93d16 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -27,6 +27,8 @@ import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -167,14 +169,21 @@ public class ExecuteMessage extends Message.Request
                 ResultMessage.Rows rows = (ResultMessage.Rows) response;
 
                 ResultSet.ResultMetadata resultMetadata = rows.result.metadata;
+
                 if (options.getProtocolVersion().isGreaterOrEqualTo(ProtocolVersion.V5))
                 {
-                    // Starting with V5 we can rely on the result metadata id coming with execute message in order to
-                    // check if there was a change, comparing it with metadata that's about to be returned to client.
-                    if (!resultMetadata.getResultMetadataId().equals(resultMetadataId))
-                        resultMetadata.setMetadataChanged();
-                    else if (options.skipMetadata())
-                        resultMetadata.setSkipMetadata();
+                    // For LWTs, always send a resultset metadata but avoid setting a metadata changed flag. This way
+                    // Client will always receive fresh metadata, but will avoid caching and reusing it. See CASSANDRA-13992
+                    // for details.
+                    if (!statement.hasConditions())
+                    {
+                        // Starting with V5 we can rely on the result metadata id coming with execute message in order to
+                        // check if there was a change, comparing it with metadata that's about to be returned to client.
+                        if (!resultMetadata.getResultMetadataId().equals(resultMetadataId))
+                            resultMetadata.setMetadataChanged();
+                        else if (options.skipMetadata())
+                            resultMetadata.setSkipMetadata();
+                    }
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index d8aefbe..b989a7d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.transport.messages;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -278,6 +280,12 @@ public abstract class ResultMessage extends Message.Response
             this.resultMetadata = resultMetadata;
         }
 
+        @VisibleForTesting
+        public Prepared withResultMetadata(ResultSet.ResultMetadata resultMetadata)
+        {
+            return new Prepared(statementId, resultMetadata.getResultMetadataId(), metadata, resultMetadata);
+        }
+
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index b038ce0..6993a65 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -47,6 +47,7 @@ import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.functions.FunctionName;
@@ -845,6 +846,11 @@ public abstract class CQLTester
         return sessions.get(protocolVersion);
     }
 
+    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression) throws IOException
+    {
+        return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression);
+    }
+
     protected String formatQuery(String query)
     {
         return formatQuery(KEYSPACE, query);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index f843965..0a314da 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -17,16 +17,29 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.junit.Before;
 import org.junit.Test;
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.SyntaxError;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.serializers.BooleanSerializer;
+import org.apache.cassandra.serializers.Int32Serializer;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class PreparedStatementsTest extends CQLTester
@@ -261,13 +274,203 @@ public class PreparedStatementsTest extends CQLTester
     }
 
     @Test
+    public void testMetadataFlagsWithLWTs() throws Throwable
+    {
+        // Verify the behavior of CASSANDRA-10786 (result metadata IDs) on the protocol level.
+        // Tests are against an LWT statement and a "regular" SELECT statement.
+        // The fundamental difference between a SELECT and an LWT statement is that the result metadata
+        // of an LWT can change between invocations - therefore we always return the resultset metadata
+        // for LWTs. For "normal" SELECTs, the resultset metadata can only change when DDLs happen
+        // (aka the famous prepared 'SELECT * FROM ks.tab' stops working after the schema of that table
+        // changes). In those cases, the Result.Rows message contains a METADATA_CHANGED flag to tell
+        // clients that the cached metadata for this statement has changed and is included in the result,
+        // whereas the resultset metadata is omitted, if the metadata ID sent with the EXECUTE message
+        // matches the one for the (current) schema.
+        // Note: this test does not cover all aspects of 10786 (yet) - it was intended to test the
+        // changes for CASSANDRA-13992.
+
+        createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+
+        try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false))
+        {
+            ResultMessage.Prepared prepUpdate = simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
+                                                                                   keyspace(), currentTable()));
+            ResultMessage.Prepared prepSelect = simpleClient.prepare(String.format("SELECT * FROM %s.%s WHERE pk = ?",
+                                                                                   keyspace(), currentTable()));
+
+            // This is a _successful_ LWT update
+            verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+                                              prepUpdate,
+                                              Arrays.asList(Int32Serializer.instance.serialize(10),
+                                                            Int32Serializer.instance.serialize(20),
+                                                            Int32Serializer.instance.serialize(1)),
+                                              Arrays.asList("[applied]"),
+                                              Arrays.asList(BooleanSerializer.instance.serialize(true)));
+
+            prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+                                                           prepSelect,
+                                                           Arrays.asList("pk", "v1", "v2"),
+                                                           Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                                         Int32Serializer.instance.serialize(10),
+                                                                         Int32Serializer.instance.serialize(20)),
+                                                           EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+
+            // This is an _unsuccessful_ LWT update (as the condition fails)
+            verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+                                              prepUpdate,
+                                              Arrays.asList(Int32Serializer.instance.serialize(10),
+                                                            Int32Serializer.instance.serialize(20),
+                                                            Int32Serializer.instance.serialize(1)),
+                                              Arrays.asList("[applied]", "v1"),
+                                              Arrays.asList(BooleanSerializer.instance.serialize(false),
+                                                            Int32Serializer.instance.serialize(10)));
+
+            prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+                                                           prepSelect,
+                                                           Arrays.asList("pk", "v1", "v2"),
+                                                           Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                                         Int32Serializer.instance.serialize(10),
+                                                                         Int32Serializer.instance.serialize(20)),
+                                                           EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+
+            // force a schema change on that table
+            simpleClient.execute(String.format("ALTER TABLE %s.%s ADD v3 int",
+                                               keyspace(), currentTable()),
+                                 ConsistencyLevel.LOCAL_ONE);
+
+            try
+            {
+                simpleClient.executePrepared(prepUpdate,
+                                             Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                           Int32Serializer.instance.serialize(30),
+                                                           Int32Serializer.instance.serialize(10)),
+                                             ConsistencyLevel.LOCAL_ONE);
+                fail();
+            }
+            catch (RuntimeException re)
+            {
+                assertTrue(re.getCause() instanceof PreparedQueryNotFoundException);
+                // the prepared statement has been removed from the pstmt cache, need to re-prepare it
+                // only prepare the statement on the server side but don't set the variable
+                simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
+                                                   keyspace(), currentTable()));
+            }
+            try
+            {
+                simpleClient.executePrepared(prepSelect,
+                                             Arrays.asList(Int32Serializer.instance.serialize(1)),
+                                             ConsistencyLevel.LOCAL_ONE);
+                fail();
+            }
+            catch (RuntimeException re)
+            {
+                assertTrue(re.getCause() instanceof PreparedQueryNotFoundException);
+                // the prepared statement has been removed from the pstmt cache, need to re-prepare it
+                // only prepare the statement on the server side but don't set the variable
+                simpleClient.prepare(String.format("SELECT * FROM %s.%s WHERE pk = ?",
+                                                   keyspace(), currentTable()));
+            }
+
+            // This is a _successful_ LWT update
+            verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+                                              prepUpdate,
+                                              Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                            Int32Serializer.instance.serialize(30),
+                                                            Int32Serializer.instance.serialize(10)),
+                                              Arrays.asList("[applied]"),
+                                              Arrays.asList(BooleanSerializer.instance.serialize(true)));
+
+            // Re-assign prepSelect here, as the resultset metadata changed to submit the updated
+            // resultset-metadata-ID in the next SELECT. This behavior does not apply to LWT statements.
+            prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+                                                           prepSelect,
+                                                           Arrays.asList("pk", "v1", "v2", "v3"),
+                                                           Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                                         Int32Serializer.instance.serialize(1),
+                                                                         Int32Serializer.instance.serialize(30),
+                                                                         null),
+                                                           EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC,
+                                                                      org.apache.cassandra.cql3.ResultSet.Flag.METADATA_CHANGED));
+
+            // This is an _unsuccessful_ LWT update (as the condition fails)
+            verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+                                              prepUpdate,
+                                              Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                            Int32Serializer.instance.serialize(30),
+                                                            Int32Serializer.instance.serialize(10)),
+                                              Arrays.asList("[applied]", "v1"),
+                                              Arrays.asList(BooleanSerializer.instance.serialize(false),
+                                                            Int32Serializer.instance.serialize(1)));
+
+            verifyMetadataFlagsWithLWTsSelect(simpleClient,
+                                              prepSelect,
+                                              Arrays.asList("pk", "v1", "v2", "v3"),
+                                              Arrays.asList(Int32Serializer.instance.serialize(1),
+                                                            Int32Serializer.instance.serialize(1),
+                                                            Int32Serializer.instance.serialize(30),
+                                                            null),
+                                              EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+        }
+    }
+
+    private ResultMessage.Prepared verifyMetadataFlagsWithLWTsSelect(SimpleClient simpleClient,
+                                                                     ResultMessage.Prepared prepSelect,
+                                                                     List<String> columnNames,
+                                                                     List<ByteBuffer> expectedRow,
+                                                                     EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> expectedFlags)
+    {
+        ResultMessage result = simpleClient.executePrepared(prepSelect,
+                                                            Collections.singletonList(Int32Serializer.instance.serialize(1)),
+                                                            ConsistencyLevel.LOCAL_ONE);
+        ResultMessage.Rows rows = (ResultMessage.Rows) result;
+        EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> resultFlags = rows.result.metadata.getFlags();
+        assertEquals(expectedFlags,
+                     resultFlags);
+        assertEquals(columnNames.size(),
+                     rows.result.metadata.getColumnCount());
+        assertEquals(columnNames,
+                     rows.result.metadata.names.stream().map(cs -> cs.name.toString()).collect(Collectors.toList()));
+        assertEquals(1,
+                     rows.result.size());
+        assertEquals(expectedRow,
+                     rows.result.rows.get(0));
+
+        if (resultFlags.contains(org.apache.cassandra.cql3.ResultSet.Flag.METADATA_CHANGED))
+            prepSelect = prepSelect.withResultMetadata(rows.result.metadata);
+        return prepSelect;
+    }
+
+    private void verifyMetadataFlagsWithLWTsUpdate(SimpleClient simpleClient,
+                                                   ResultMessage.Prepared prepUpdate,
+                                                   List<ByteBuffer> params,
+                                                   List<String> columnNames,
+                                                   List<ByteBuffer> expectedRow)
+    {
+        ResultMessage result = simpleClient.executePrepared(prepUpdate,
+                                                            params,
+                                                            ConsistencyLevel.LOCAL_ONE);
+        ResultMessage.Rows rows = (ResultMessage.Rows) result;
+        EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> resultFlags = rows.result.metadata.getFlags();
+        assertEquals(EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC),
+                     resultFlags);
+        assertEquals(columnNames.size(),
+                     rows.result.metadata.getColumnCount());
+        assertEquals(columnNames,
+                     rows.result.metadata.names.stream().map(cs -> cs.name.toString()).collect(Collectors.toList()));
+        assertEquals(1,
+                     rows.result.size());
+        assertEquals(expectedRow,
+                     rows.result.rows.get(0));
+    }
+
+    @Test
     public void testPrepareWithLWT() throws Throwable
     {
         testPrepareWithLWT(ProtocolVersion.V4);
         testPrepareWithLWT(ProtocolVersion.V5);
     }
 
-
     private void testPrepareWithLWT(ProtocolVersion version) throws Throwable
     {
         Session session = sessionNet(version);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org