You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2017/11/21 12:15:30 UTC
cassandra git commit: Make LWTs send resultset metadata on every
request
Repository: cassandra
Updated Branches:
refs/heads/trunk 5792b667e -> 7eb915097
Make LWTs send resultset metadata on every request
Patch by Alex Petrov; reviewed by Robert Stupp for CASSANDRA-13992
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7eb91509
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7eb91509
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7eb91509
Branch: refs/heads/trunk
Commit: 7eb915097dc3e34e1bb4ef96e6bd8eb67d574622
Parents: 5792b66
Author: Alex Petrov <ol...@gmail.com>
Authored: Mon Nov 13 08:56:14 2017 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Nov 21 13:14:51 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
...iver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar | Bin 0 -> 2618371 bytes
...e-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar | Bin 2613656 -> 0 bytes
.../org/apache/cassandra/cql3/CQLStatement.java | 8 +
.../org/apache/cassandra/cql3/ResultSet.java | 20 ++
.../cql3/statements/BatchStatement.java | 5 +
.../cql3/statements/ModificationStatement.java | 7 +-
.../cassandra/transport/SimpleClient.java | 4 +-
.../transport/messages/ExecuteMessage.java | 21 +-
.../transport/messages/ResultMessage.java | 8 +
.../org/apache/cassandra/cql3/CQLTester.java | 6 +
.../cassandra/cql3/PreparedStatementsTest.java | 207 ++++++++++++++++++-
12 files changed, 276 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03f5de8..6da2712 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
* Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
* Introduce leaf-only iterator (CASSANDRA-9988)
* Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..a7be9cb
Binary files /dev/null and b/lib/cassandra-driver-core-3.3.2-0461ed35-SNAPSHOT-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
deleted file mode 100644
index d95a811..0000000
Binary files a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 901ecd4..f3b2090 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -67,4 +67,12 @@ public interface CQLStatement
* @return functions all functions found (may contain duplicates)
*/
public Iterable<Function> getFunctions();
+
+ /**
+ * Whether or not this CQL Statement has LWT conditions
+ */
+ default public boolean hasConditions()
+ {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 7dd51fd..9d79dea 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -28,6 +28,8 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
+import com.google.common.annotations.VisibleForTesting;
+
import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.SelectStatement;
@@ -249,6 +251,24 @@ public class ResultSet
return names == null ? columnCount : names.size();
}
+ @VisibleForTesting
+ public EnumSet<Flag> getFlags()
+ {
+ return flags;
+ }
+
+ @VisibleForTesting
+ public int getColumnCount()
+ {
+ return columnCount;
+ }
+
+ @VisibleForTesting
+ public PagingState getPagingState()
+ {
+ return pagingState;
+ }
+
/**
* Adds the specified columns which will not be serialized.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 87204f8..7497f47 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -476,6 +476,11 @@ public class BatchStatement implements CQLStatement
return Pair.create(casRequest, columnsWithConditions);
}
+ public boolean hasConditions()
+ {
+ return hasConditions;
+ }
+
public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
if (hasConditions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4191285..decb99f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -66,6 +67,8 @@ public abstract class ModificationStatement implements CQLStatement
{
protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+ private final static MD5Digest EMPTY_HASH = MD5Digest.wrap(new byte[] {});
+
public static final String CUSTOM_EXPRESSIONS_NOT_ALLOWED =
"Custom index expressions cannot be used in WHERE clauses for UPDATE or DELETE statements";
@@ -502,7 +505,7 @@ public abstract class ModificationStatement implements CQLStatement
List<ColumnSpecification> specs = new ArrayList<>();
specs.add(casResultColumnSpecification(ksName, cfName));
- return new ResultSet.ResultMetadata(specs);
+ return new ResultSet.ResultMetadata(EMPTY_HASH, specs);
}
private static ColumnSpecification casResultColumnSpecification(String ksName, String cfName)
@@ -547,7 +550,7 @@ public abstract class ModificationStatement implements CQLStatement
row.addAll(right.rows.get(i));
rows.add(row);
}
- return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
+ return new ResultSet(new ResultSet.ResultMetadata(EMPTY_HASH, specs), rows);
}
private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnMetadata> columnsWithConditions, boolean isBatch, QueryOptions options)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 9c1fb07..07463e2 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -119,7 +119,7 @@ public class SimpleClient implements Closeable
this(host, port, new EncryptionOptions());
}
- public void connect(boolean useCompression) throws IOException
+ public SimpleClient connect(boolean useCompression) throws IOException
{
establishConnection();
@@ -131,6 +131,8 @@ public class SimpleClient implements Closeable
connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
}
execute(new StartupMessage(options));
+
+ return this;
}
public void setEventHandler(EventHandler eventHandler)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index a8fd2a0..0b93d16 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -27,6 +27,8 @@ import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.UpdateStatement;
import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -167,14 +169,21 @@ public class ExecuteMessage extends Message.Request
ResultMessage.Rows rows = (ResultMessage.Rows) response;
ResultSet.ResultMetadata resultMetadata = rows.result.metadata;
+
if (options.getProtocolVersion().isGreaterOrEqualTo(ProtocolVersion.V5))
{
- // Starting with V5 we can rely on the result metadata id coming with execute message in order to
- // check if there was a change, comparing it with metadata that's about to be returned to client.
- if (!resultMetadata.getResultMetadataId().equals(resultMetadataId))
- resultMetadata.setMetadataChanged();
- else if (options.skipMetadata())
- resultMetadata.setSkipMetadata();
+ // For LWTs, always send a resultset metadata but avoid setting a metadata changed flag. This way
+ // Client will always receive fresh metadata, but will avoid caching and reusing it. See CASSANDRA-13992
+ // for details.
+ if (!statement.hasConditions())
+ {
+ // Starting with V5 we can rely on the result metadata id coming with execute message in order to
+ // check if there was a change, comparing it with metadata that's about to be returned to client.
+ if (!resultMetadata.getResultMetadataId().equals(resultMetadataId))
+ resultMetadata.setMetadataChanged();
+ else if (options.skipMetadata())
+ resultMetadata.setSkipMetadata();
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index d8aefbe..b989a7d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.transport.messages;
+import com.google.common.annotations.VisibleForTesting;
+
import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.ColumnSpecification;
@@ -278,6 +280,12 @@ public abstract class ResultMessage extends Message.Response
this.resultMetadata = resultMetadata;
}
+ @VisibleForTesting
+ public Prepared withResultMetadata(ResultSet.ResultMetadata resultMetadata)
+ {
+ return new Prepared(statementId, resultMetadata.getResultMetadataId(), metadata, resultMetadata);
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index b038ce0..6993a65 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -47,6 +47,7 @@ import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.functions.FunctionName;
@@ -845,6 +846,11 @@ public abstract class CQLTester
return sessions.get(protocolVersion);
}
+ protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression) throws IOException
+ {
+ return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression);
+ }
+
protected String formatQuery(String query)
{
return formatQuery(KEYSPACE, query);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7eb91509/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index f843965..0a314da 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -17,16 +17,29 @@
*/
package org.apache.cassandra.cql3;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
import org.junit.Before;
import org.junit.Test;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.SyntaxError;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.serializers.BooleanSerializer;
+import org.apache.cassandra.serializers.Int32Serializer;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.ResultMessage;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class PreparedStatementsTest extends CQLTester
@@ -261,13 +274,203 @@ public class PreparedStatementsTest extends CQLTester
}
@Test
+ public void testMetadataFlagsWithLWTs() throws Throwable
+ {
+ // Verify the behavior of CASSANDRA-10786 (result metadata IDs) on the protocol level.
+ // Tests are against an LWT statement and a "regular" SELECT statement.
+ // The fundamental difference between a SELECT and an LWT statement is that the result metadata
+ // of an LWT can change between invocations - therefore we always return the resultset metadata
+ // for LWTs. For "normal" SELECTs, the resultset metadata can only change when DDLs happen
+ // (aka the famous prepared 'SELECT * FROM ks.tab' stops working after the schema of that table
+ // changes). In those cases, the Result.Rows message contains a METADATA_CHANGED flag to tell
+ // clients that the cached metadata for this statement has changed and is included in the result,
+ // whereas the resultset metadata is omitted, if the metadata ID sent with the EXECUTE message
+ // matches the one for the (current) schema.
+ // Note: this test does not cover all aspects of 10786 (yet) - it was intended to test the
+ // changes for CASSANDRA-13992.
+
+ createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
+ execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+
+ try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false))
+ {
+ ResultMessage.Prepared prepUpdate = simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
+ keyspace(), currentTable()));
+ ResultMessage.Prepared prepSelect = simpleClient.prepare(String.format("SELECT * FROM %s.%s WHERE pk = ?",
+ keyspace(), currentTable()));
+
+ // This is a _successful_ LWT update
+ verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+ prepUpdate,
+ Arrays.asList(Int32Serializer.instance.serialize(10),
+ Int32Serializer.instance.serialize(20),
+ Int32Serializer.instance.serialize(1)),
+ Arrays.asList("[applied]"),
+ Arrays.asList(BooleanSerializer.instance.serialize(true)));
+
+ prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+ prepSelect,
+ Arrays.asList("pk", "v1", "v2"),
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(10),
+ Int32Serializer.instance.serialize(20)),
+ EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+
+ // This is an _unsuccessful_ LWT update (as the condition fails)
+ verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+ prepUpdate,
+ Arrays.asList(Int32Serializer.instance.serialize(10),
+ Int32Serializer.instance.serialize(20),
+ Int32Serializer.instance.serialize(1)),
+ Arrays.asList("[applied]", "v1"),
+ Arrays.asList(BooleanSerializer.instance.serialize(false),
+ Int32Serializer.instance.serialize(10)));
+
+ prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+ prepSelect,
+ Arrays.asList("pk", "v1", "v2"),
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(10),
+ Int32Serializer.instance.serialize(20)),
+ EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+
+ // force a schema change on that table
+ simpleClient.execute(String.format("ALTER TABLE %s.%s ADD v3 int",
+ keyspace(), currentTable()),
+ ConsistencyLevel.LOCAL_ONE);
+
+ try
+ {
+ simpleClient.executePrepared(prepUpdate,
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(30),
+ Int32Serializer.instance.serialize(10)),
+ ConsistencyLevel.LOCAL_ONE);
+ fail();
+ }
+ catch (RuntimeException re)
+ {
+ assertTrue(re.getCause() instanceof PreparedQueryNotFoundException);
+ // the prepared statement has been removed from the pstmt cache, need to re-prepare it
+ // only prepare the statement on the server side but don't set the variable
+ simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
+ keyspace(), currentTable()));
+ }
+ try
+ {
+ simpleClient.executePrepared(prepSelect,
+ Arrays.asList(Int32Serializer.instance.serialize(1)),
+ ConsistencyLevel.LOCAL_ONE);
+ fail();
+ }
+ catch (RuntimeException re)
+ {
+ assertTrue(re.getCause() instanceof PreparedQueryNotFoundException);
+ // the prepared statement has been removed from the pstmt cache, need to re-prepare it
+ // only prepare the statement on the server side but don't set the variable
+ simpleClient.prepare(String.format("SELECT * FROM %s.%s WHERE pk = ?",
+ keyspace(), currentTable()));
+ }
+
+ // This is a _successful_ LWT update
+ verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+ prepUpdate,
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(30),
+ Int32Serializer.instance.serialize(10)),
+ Arrays.asList("[applied]"),
+ Arrays.asList(BooleanSerializer.instance.serialize(true)));
+
+ // Re-assign prepSelect here, as the resultset metadata changed to submit the updated
+ // resultset-metadata-ID in the next SELECT. This behavior does not apply to LWT statements.
+ prepSelect = verifyMetadataFlagsWithLWTsSelect(simpleClient,
+ prepSelect,
+ Arrays.asList("pk", "v1", "v2", "v3"),
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(30),
+ null),
+ EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC,
+ org.apache.cassandra.cql3.ResultSet.Flag.METADATA_CHANGED));
+
+ // This is an _unsuccessful_ LWT update (as the condition fails)
+ verifyMetadataFlagsWithLWTsUpdate(simpleClient,
+ prepUpdate,
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(30),
+ Int32Serializer.instance.serialize(10)),
+ Arrays.asList("[applied]", "v1"),
+ Arrays.asList(BooleanSerializer.instance.serialize(false),
+ Int32Serializer.instance.serialize(1)));
+
+ verifyMetadataFlagsWithLWTsSelect(simpleClient,
+ prepSelect,
+ Arrays.asList("pk", "v1", "v2", "v3"),
+ Arrays.asList(Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(1),
+ Int32Serializer.instance.serialize(30),
+ null),
+ EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC));
+ }
+ }
+
+ private ResultMessage.Prepared verifyMetadataFlagsWithLWTsSelect(SimpleClient simpleClient,
+ ResultMessage.Prepared prepSelect,
+ List<String> columnNames,
+ List<ByteBuffer> expectedRow,
+ EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> expectedFlags)
+ {
+ ResultMessage result = simpleClient.executePrepared(prepSelect,
+ Collections.singletonList(Int32Serializer.instance.serialize(1)),
+ ConsistencyLevel.LOCAL_ONE);
+ ResultMessage.Rows rows = (ResultMessage.Rows) result;
+ EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> resultFlags = rows.result.metadata.getFlags();
+ assertEquals(expectedFlags,
+ resultFlags);
+ assertEquals(columnNames.size(),
+ rows.result.metadata.getColumnCount());
+ assertEquals(columnNames,
+ rows.result.metadata.names.stream().map(cs -> cs.name.toString()).collect(Collectors.toList()));
+ assertEquals(1,
+ rows.result.size());
+ assertEquals(expectedRow,
+ rows.result.rows.get(0));
+
+ if (resultFlags.contains(org.apache.cassandra.cql3.ResultSet.Flag.METADATA_CHANGED))
+ prepSelect = prepSelect.withResultMetadata(rows.result.metadata);
+ return prepSelect;
+ }
+
+ private void verifyMetadataFlagsWithLWTsUpdate(SimpleClient simpleClient,
+ ResultMessage.Prepared prepUpdate,
+ List<ByteBuffer> params,
+ List<String> columnNames,
+ List<ByteBuffer> expectedRow)
+ {
+ ResultMessage result = simpleClient.executePrepared(prepUpdate,
+ params,
+ ConsistencyLevel.LOCAL_ONE);
+ ResultMessage.Rows rows = (ResultMessage.Rows) result;
+ EnumSet<org.apache.cassandra.cql3.ResultSet.Flag> resultFlags = rows.result.metadata.getFlags();
+ assertEquals(EnumSet.of(org.apache.cassandra.cql3.ResultSet.Flag.GLOBAL_TABLES_SPEC),
+ resultFlags);
+ assertEquals(columnNames.size(),
+ rows.result.metadata.getColumnCount());
+ assertEquals(columnNames,
+ rows.result.metadata.names.stream().map(cs -> cs.name.toString()).collect(Collectors.toList()));
+ assertEquals(1,
+ rows.result.size());
+ assertEquals(expectedRow,
+ rows.result.rows.get(0));
+ }
+
+ @Test
public void testPrepareWithLWT() throws Throwable
{
testPrepareWithLWT(ProtocolVersion.V4);
testPrepareWithLWT(ProtocolVersion.V5);
}
-
private void testPrepareWithLWT(ProtocolVersion version) throws Throwable
{
Session session = sessionNet(version);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org